mirror of
https://github.com/fluencelabs/redis
synced 2025-03-17 16:10:50 +00:00
Merge remote branch 'origin/unstable' into unstable
This commit is contained in:
commit
cf1eefa420
@ -565,16 +565,18 @@ werr:
|
||||
*/
|
||||
int rewriteAppendOnlyFileBackground(void) {
|
||||
pid_t childpid;
|
||||
long long start;
|
||||
|
||||
if (server.bgrewritechildpid != -1) return REDIS_ERR;
|
||||
if (server.ds_enabled != 0) {
|
||||
redisLog(REDIS_WARNING,"BGREWRITEAOF called with diskstore enabled: AOF is not supported when diskstore is enabled. Operation not performed.");
|
||||
return REDIS_ERR;
|
||||
}
|
||||
start = ustime();
|
||||
if ((childpid = fork()) == 0) {
|
||||
/* Child */
|
||||
char tmpfile[256];
|
||||
|
||||
/* Child */
|
||||
if (server.ipfd > 0) close(server.ipfd);
|
||||
if (server.sofd > 0) close(server.sofd);
|
||||
snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
|
||||
@ -585,6 +587,7 @@ int rewriteAppendOnlyFileBackground(void) {
|
||||
}
|
||||
} else {
|
||||
/* Parent */
|
||||
server.stat_fork_time = ustime()-start;
|
||||
if (childpid == -1) {
|
||||
redisLog(REDIS_WARNING,
|
||||
"Can't rewrite append only file in background: fork: %s",
|
||||
|
@ -180,7 +180,7 @@ void decrRefCount(void *obj) {
|
||||
robj *o = obj;
|
||||
|
||||
if (o->refcount <= 0) redisPanic("decrRefCount against refcount <= 0");
|
||||
if (--(o->refcount) == 0) {
|
||||
if (o->refcount == 1) {
|
||||
switch(o->type) {
|
||||
case REDIS_STRING: freeStringObject(o); break;
|
||||
case REDIS_LIST: freeListObject(o); break;
|
||||
@ -189,8 +189,9 @@ void decrRefCount(void *obj) {
|
||||
case REDIS_HASH: freeHashObject(o); break;
|
||||
default: redisPanic("Unknown object type"); break;
|
||||
}
|
||||
o->ptr = NULL; /* defensive programming. We'll see NULL in traces. */
|
||||
zfree(o);
|
||||
} else {
|
||||
o->refcount--;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -482,6 +482,7 @@ werr:
|
||||
|
||||
int rdbSaveBackground(char *filename) {
|
||||
pid_t childpid;
|
||||
long long start;
|
||||
|
||||
if (server.bgsavechildpid != -1 ||
|
||||
server.bgsavethread != (pthread_t) -1) return REDIS_ERR;
|
||||
@ -493,6 +494,7 @@ int rdbSaveBackground(char *filename) {
|
||||
return dsRdbSaveBackground(filename);
|
||||
}
|
||||
|
||||
start = ustime();
|
||||
if ((childpid = fork()) == 0) {
|
||||
int retval;
|
||||
|
||||
@ -503,6 +505,7 @@ int rdbSaveBackground(char *filename) {
|
||||
_exit((retval == REDIS_OK) ? 0 : 1);
|
||||
} else {
|
||||
/* Parent */
|
||||
server.stat_fork_time = ustime()-start;
|
||||
if (childpid == -1) {
|
||||
redisLog(REDIS_WARNING,"Can't save in background: fork: %s",
|
||||
strerror(errno));
|
||||
|
@ -55,6 +55,7 @@ static struct config {
|
||||
int hostport;
|
||||
char *hostsocket;
|
||||
long repeat;
|
||||
long interval;
|
||||
int dbnum;
|
||||
int interactive;
|
||||
int shutdown;
|
||||
@ -87,9 +88,11 @@ static long long mstime(void) {
|
||||
|
||||
static void cliRefreshPrompt(void) {
|
||||
if (config.dbnum == 0)
|
||||
snprintf(config.prompt,sizeof(config.prompt),"redis> ");
|
||||
snprintf(config.prompt,sizeof(config.prompt),"redis %s:%d> ",
|
||||
config.hostip, config.hostport);
|
||||
else
|
||||
snprintf(config.prompt,sizeof(config.prompt),"redis:%d> ",config.dbnum);
|
||||
snprintf(config.prompt,sizeof(config.prompt),"redis %s:%d[%d]> ",
|
||||
config.hostip, config.hostport, config.dbnum);
|
||||
}
|
||||
|
||||
/*------------------------------------------------------------------------------
|
||||
@ -314,10 +317,9 @@ static int cliConnect(int force) {
|
||||
return REDIS_OK;
|
||||
}
|
||||
|
||||
static void cliPrintContextErrorAndExit() {
|
||||
static void cliPrintContextError() {
|
||||
if (context == NULL) return;
|
||||
fprintf(stderr,"Error: %s\n",context->errstr);
|
||||
exit(1);
|
||||
}
|
||||
|
||||
static sds cliFormatReplyTTY(redisReply *r, char *prefix) {
|
||||
@ -436,7 +438,8 @@ static int cliReadReply(int output_raw_strings) {
|
||||
if (context->err == REDIS_ERR_EOF)
|
||||
return REDIS_ERR;
|
||||
}
|
||||
cliPrintContextErrorAndExit();
|
||||
cliPrintContextError();
|
||||
exit(1);
|
||||
return REDIS_ERR; /* avoid compiler warning */
|
||||
}
|
||||
|
||||
@ -462,10 +465,7 @@ static int cliSendCommand(int argc, char **argv, int repeat) {
|
||||
size_t *argvlen;
|
||||
int j, output_raw;
|
||||
|
||||
if (context == NULL) {
|
||||
printf("Not connected, please use: connect <host> <port>\n");
|
||||
return REDIS_OK;
|
||||
}
|
||||
if (context == NULL) return REDIS_ERR;
|
||||
|
||||
output_raw = 0;
|
||||
if (!strcasecmp(command,"info") ||
|
||||
@ -518,6 +518,8 @@ static int cliSendCommand(int argc, char **argv, int repeat) {
|
||||
cliRefreshPrompt();
|
||||
}
|
||||
}
|
||||
if (config.interval) usleep(config.interval);
|
||||
fflush(stdout); /* Make it grep friendly */
|
||||
}
|
||||
|
||||
free(argvlen);
|
||||
@ -553,6 +555,10 @@ static int parseOptions(int argc, char **argv) {
|
||||
} else if (!strcmp(argv[i],"-r") && !lastarg) {
|
||||
config.repeat = strtoll(argv[i+1],NULL,10);
|
||||
i++;
|
||||
} else if (!strcmp(argv[i],"-i") && !lastarg) {
|
||||
double seconds = atof(argv[i+1]);
|
||||
config.interval = seconds*1000000;
|
||||
i++;
|
||||
} else if (!strcmp(argv[i],"-n") && !lastarg) {
|
||||
config.dbnum = atoi(argv[i+1]);
|
||||
i++;
|
||||
@ -605,6 +611,8 @@ static void usage() {
|
||||
" -s <socket> Server socket (overrides hostname and port)\n"
|
||||
" -a <password> Password to use when connecting to the server\n"
|
||||
" -r <repeat> Execute specified command N times\n"
|
||||
" -i <interval> When -r is used, waits <interval> seconds per command.\n"
|
||||
" It is possible to specify sub-second times like -i 0.1.\n"
|
||||
" -n <db> Database number\n"
|
||||
" -x Read last argument from STDIN\n"
|
||||
" -d <delimiter> Multi-bulk delimiter in for raw formatting (default: \\n)\n"
|
||||
@ -616,6 +624,7 @@ static void usage() {
|
||||
" cat /etc/passwd | redis-cli -x set mypasswd\n"
|
||||
" redis-cli get mypasswd\n"
|
||||
" redis-cli -r 100 lpush mylist x\n"
|
||||
" redis-cli -r 100 -i 1 info | grep used_memory_human:\n"
|
||||
"\n"
|
||||
"When no command is given, redis-cli starts in interactive mode.\n"
|
||||
"Type \"help\" in interactive mode for information on available commands.\n"
|
||||
@ -681,14 +690,25 @@ static void repl() {
|
||||
linenoiseClearScreen();
|
||||
} else {
|
||||
long long start_time = mstime(), elapsed;
|
||||
int repeat, skipargs = 0;
|
||||
|
||||
if (cliSendCommand(argc,argv,1) != REDIS_OK) {
|
||||
repeat = atoi(argv[0]);
|
||||
if (repeat) {
|
||||
skipargs = 1;
|
||||
} else {
|
||||
repeat = 1;
|
||||
}
|
||||
|
||||
if (cliSendCommand(argc-skipargs,argv+skipargs,repeat)
|
||||
!= REDIS_OK)
|
||||
{
|
||||
cliConnect(1);
|
||||
|
||||
/* If we still cannot send the command,
|
||||
* print error and abort. */
|
||||
if (cliSendCommand(argc,argv,1) != REDIS_OK)
|
||||
cliPrintContextErrorAndExit();
|
||||
/* If we still cannot send the command print error.
|
||||
* We'll try to reconnect the next time. */
|
||||
if (cliSendCommand(argc-skipargs,argv+skipargs,repeat)
|
||||
!= REDIS_OK)
|
||||
cliPrintContextError();
|
||||
}
|
||||
elapsed = mstime()-start_time;
|
||||
if (elapsed >= 500) {
|
||||
@ -726,6 +746,7 @@ int main(int argc, char **argv) {
|
||||
config.hostport = 6379;
|
||||
config.hostsocket = NULL;
|
||||
config.repeat = 1;
|
||||
config.interval = 0;
|
||||
config.dbnum = 0;
|
||||
config.interactive = 0;
|
||||
config.shutdown = 0;
|
||||
@ -741,11 +762,15 @@ int main(int argc, char **argv) {
|
||||
argc -= firstarg;
|
||||
argv += firstarg;
|
||||
|
||||
/* Try to connect */
|
||||
if (cliConnect(0) != REDIS_OK) exit(1);
|
||||
|
||||
/* Start interactive mode when no command is provided */
|
||||
if (argc == 0) repl();
|
||||
if (argc == 0) {
|
||||
/* Note that in repl mode we don't abort on connection error.
|
||||
* A new attempt will be performed for every command send. */
|
||||
cliConnect(0);
|
||||
repl();
|
||||
}
|
||||
|
||||
/* Otherwise, we have some arguments to execute */
|
||||
if (cliConnect(0) != REDIS_OK) exit(1);
|
||||
return noninteractive(argc,convertToSds(argc,argv));
|
||||
}
|
||||
|
12
src/redis.c
12
src/redis.c
@ -116,9 +116,9 @@ struct redisCommand redisCommandTable[] = {
|
||||
{"sdiff",sdiffCommand,-2,REDIS_CMD_DENYOOM,NULL,1,-1,1,0,0},
|
||||
{"sdiffstore",sdiffstoreCommand,-3,REDIS_CMD_DENYOOM,NULL,2,-1,1,0,0},
|
||||
{"smembers",sinterCommand,2,0,NULL,1,1,1,0,0},
|
||||
{"zadd",zaddCommand,4,REDIS_CMD_DENYOOM,NULL,1,1,1,0,0},
|
||||
{"zadd",zaddCommand,-4,REDIS_CMD_DENYOOM,NULL,1,1,1,0,0},
|
||||
{"zincrby",zincrbyCommand,4,REDIS_CMD_DENYOOM,NULL,1,1,1,0,0},
|
||||
{"zrem",zremCommand,3,0,NULL,1,1,1,0,0},
|
||||
{"zrem",zremCommand,-3,0,NULL,1,1,1,0,0},
|
||||
{"zremrangebyscore",zremrangebyscoreCommand,4,0,NULL,1,1,1,0,0},
|
||||
{"zremrangebyrank",zremrangebyrankCommand,4,0,NULL,1,1,1,0,0},
|
||||
{"zunionstore",zunionstoreCommand,-4,REDIS_CMD_DENYOOM,zunionInterGetKeys,0,0,0,0,0},
|
||||
@ -871,6 +871,7 @@ void initServerConfig() {
|
||||
server.masterport = 6379;
|
||||
server.master = NULL;
|
||||
server.replstate = REDIS_REPL_NONE;
|
||||
server.repl_syncio_timeout = REDIS_REPL_SYNCIO_TIMEOUT;
|
||||
server.repl_serve_stale_data = 1;
|
||||
|
||||
/* Double constants initialization */
|
||||
@ -963,6 +964,7 @@ void initServer() {
|
||||
server.stat_keyspace_misses = 0;
|
||||
server.stat_keyspace_hits = 0;
|
||||
server.stat_peak_memory = 0;
|
||||
server.stat_fork_time = 0;
|
||||
server.unixtime = time(NULL);
|
||||
aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL);
|
||||
if (server.ipfd > 0 && aeCreateFileEvent(server.el,server.ipfd,AE_READABLE,
|
||||
@ -1438,7 +1440,8 @@ sds genRedisInfoString(char *section) {
|
||||
"keyspace_hits:%lld\r\n"
|
||||
"keyspace_misses:%lld\r\n"
|
||||
"pubsub_channels:%ld\r\n"
|
||||
"pubsub_patterns:%u\r\n",
|
||||
"pubsub_patterns:%u\r\n"
|
||||
"latest_fork_usec:%lld\r\n",
|
||||
server.stat_numconnections,
|
||||
server.stat_numcommands,
|
||||
server.stat_expiredkeys,
|
||||
@ -1446,7 +1449,8 @@ sds genRedisInfoString(char *section) {
|
||||
server.stat_keyspace_hits,
|
||||
server.stat_keyspace_misses,
|
||||
dictSize(server.pubsub_channels),
|
||||
listLength(server.pubsub_patterns));
|
||||
listLength(server.pubsub_patterns),
|
||||
server.stat_fork_time);
|
||||
}
|
||||
|
||||
/* Replication */
|
||||
|
16
src/redis.h
16
src/redis.h
@ -41,7 +41,6 @@
|
||||
#define REDIS_MAXIDLETIME (60*5) /* default client timeout */
|
||||
#define REDIS_IOBUF_LEN 1024
|
||||
#define REDIS_LOADBUF_LEN 1024
|
||||
#define REDIS_STATIC_ARGS 8
|
||||
#define REDIS_DEFAULT_DBNUM 16
|
||||
#define REDIS_CONFIGLINE_MAX 1024
|
||||
#define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
|
||||
@ -153,10 +152,14 @@
|
||||
#define REDIS_REQ_MULTIBULK 2
|
||||
|
||||
/* Slave replication state - slave side */
|
||||
#define REDIS_REPL_NONE 0 /* No active replication */
|
||||
#define REDIS_REPL_CONNECT 1 /* Must connect to master */
|
||||
#define REDIS_REPL_TRANSFER 2 /* Receiving .rdb from master */
|
||||
#define REDIS_REPL_CONNECTED 3 /* Connected to master */
|
||||
#define REDIS_REPL_NONE 0 /* No active replication */
|
||||
#define REDIS_REPL_CONNECT 1 /* Must connect to master */
|
||||
#define REDIS_REPL_CONNECTING 2 /* Connecting to master */
|
||||
#define REDIS_REPL_TRANSFER 3 /* Receiving .rdb from master */
|
||||
#define REDIS_REPL_CONNECTED 4 /* Connected to master */
|
||||
|
||||
/* Synchronous read timeout - slave side */
|
||||
#define REDIS_REPL_SYNCIO_TIMEOUT 5
|
||||
|
||||
/* Slave replication state - from the point of view of master
|
||||
* Note that in SEND_BULK and ONLINE state the slave receives new updates
|
||||
@ -543,6 +546,7 @@ struct redisServer {
|
||||
long long stat_keyspace_hits; /* number of successful lookups of keys */
|
||||
long long stat_keyspace_misses; /* number of failed lookups of keys */
|
||||
size_t stat_peak_memory; /* max used memory record */
|
||||
long long stat_fork_time; /* time needed to perform latets fork() */
|
||||
/* Configuration */
|
||||
int verbosity;
|
||||
int maxidletime;
|
||||
@ -585,6 +589,7 @@ struct redisServer {
|
||||
char *masterhost;
|
||||
int masterport;
|
||||
redisClient *master; /* client that is master for this slave */
|
||||
int repl_syncio_timeout; /* timeout for synchronous I/O calls */
|
||||
int replstate; /* replication status if the instance is a slave */
|
||||
off_t repl_transfer_left; /* bytes left reading .rdb */
|
||||
int repl_transfer_s; /* slave -> master SYNC socket */
|
||||
@ -888,7 +893,6 @@ int fwriteBulkCount(FILE *fp, char prefix, int count);
|
||||
/* Replication */
|
||||
void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc);
|
||||
void replicationFeedMonitors(list *monitors, int dictid, robj **argv, int argc);
|
||||
int syncWithMaster(void);
|
||||
void updateSlavesWaitingBgsave(int bgsaveerr);
|
||||
void replicationCron(void);
|
||||
|
||||
|
@ -10,38 +10,8 @@
|
||||
void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
|
||||
listNode *ln;
|
||||
listIter li;
|
||||
int outc = 0, j;
|
||||
robj **outv;
|
||||
/* We need 1+(ARGS*3) objects since commands are using the new protocol
|
||||
* and we one 1 object for the first "*<count>\r\n" multibulk count, then
|
||||
* for every additional object we have "$<count>\r\n" + object + "\r\n". */
|
||||
robj *static_outv[REDIS_STATIC_ARGS*3+1];
|
||||
robj *lenobj;
|
||||
int j;
|
||||
|
||||
if (argc <= REDIS_STATIC_ARGS) {
|
||||
outv = static_outv;
|
||||
} else {
|
||||
outv = zmalloc(sizeof(robj*)*(argc*3+1));
|
||||
}
|
||||
|
||||
lenobj = createObject(REDIS_STRING,
|
||||
sdscatprintf(sdsempty(), "*%d\r\n", argc));
|
||||
lenobj->refcount = 0;
|
||||
outv[outc++] = lenobj;
|
||||
for (j = 0; j < argc; j++) {
|
||||
lenobj = createObject(REDIS_STRING,
|
||||
sdscatprintf(sdsempty(),"$%lu\r\n",
|
||||
(unsigned long) stringObjectLen(argv[j])));
|
||||
lenobj->refcount = 0;
|
||||
outv[outc++] = lenobj;
|
||||
outv[outc++] = argv[j];
|
||||
outv[outc++] = shared.crlf;
|
||||
}
|
||||
|
||||
/* Increment all the refcounts at start and decrement at end in order to
|
||||
* be sure to free objects if there is no slave in a replication state
|
||||
* able to be feed with commands */
|
||||
for (j = 0; j < outc; j++) incrRefCount(outv[j]);
|
||||
listRewind(slaves,&li);
|
||||
while((ln = listNext(&li))) {
|
||||
redisClient *slave = ln->value;
|
||||
@ -49,7 +19,9 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
|
||||
/* Don't feed slaves that are still waiting for BGSAVE to start */
|
||||
if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue;
|
||||
|
||||
/* Feed all the other slaves, MONITORs and so on */
|
||||
/* Feed slaves that are waiting for the initial SYNC (so these commands
|
||||
* are queued in the output buffer until the intial SYNC completes),
|
||||
* or are already in sync with the master. */
|
||||
if (slave->slaveseldb != dictid) {
|
||||
robj *selectcmd;
|
||||
|
||||
@ -73,10 +45,9 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
|
||||
addReply(slave,selectcmd);
|
||||
slave->slaveseldb = dictid;
|
||||
}
|
||||
for (j = 0; j < outc; j++) addReply(slave,outv[j]);
|
||||
addReplyMultiBulkLen(slave,argc);
|
||||
for (j = 0; j < argc; j++) addReplyBulk(slave,argv[j]);
|
||||
}
|
||||
for (j = 0; j < outc; j++) decrRefCount(outv[j]);
|
||||
if (outv != static_outv) zfree(outv);
|
||||
}
|
||||
|
||||
void replicationFeedMonitors(list *monitors, int dictid, robj **argv, int argc) {
|
||||
@ -315,19 +286,18 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
|
||||
/* If repl_transfer_left == -1 we still have to read the bulk length
|
||||
* from the master reply. */
|
||||
if (server.repl_transfer_left == -1) {
|
||||
if (syncReadLine(fd,buf,1024,3600) == -1) {
|
||||
if (syncReadLine(fd,buf,1024,server.repl_syncio_timeout) == -1) {
|
||||
redisLog(REDIS_WARNING,
|
||||
"I/O error reading bulk count from MASTER: %s",
|
||||
strerror(errno));
|
||||
replicationAbortSyncTransfer();
|
||||
return;
|
||||
goto error;
|
||||
}
|
||||
|
||||
if (buf[0] == '-') {
|
||||
redisLog(REDIS_WARNING,
|
||||
"MASTER aborted replication with an error: %s",
|
||||
buf+1);
|
||||
replicationAbortSyncTransfer();
|
||||
return;
|
||||
goto error;
|
||||
} else if (buf[0] == '\0') {
|
||||
/* At this stage just a newline works as a PING in order to take
|
||||
* the connection live. So we refresh our last interaction
|
||||
@ -336,8 +306,7 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
|
||||
return;
|
||||
} else if (buf[0] != '$') {
|
||||
redisLog(REDIS_WARNING,"Bad protocol from MASTER, the first byte is not '$', are you sure the host and port are right?");
|
||||
replicationAbortSyncTransfer();
|
||||
return;
|
||||
goto error;
|
||||
}
|
||||
server.repl_transfer_left = strtol(buf+1,NULL,10);
|
||||
redisLog(REDIS_NOTICE,
|
||||
@ -359,8 +328,7 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
|
||||
server.repl_transfer_lastio = time(NULL);
|
||||
if (write(server.repl_transfer_fd,buf,nread) != nread) {
|
||||
redisLog(REDIS_WARNING,"Write error or short write writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno));
|
||||
replicationAbortSyncTransfer();
|
||||
return;
|
||||
goto error;
|
||||
}
|
||||
server.repl_transfer_left -= nread;
|
||||
/* Check if the transfer is now complete */
|
||||
@ -391,48 +359,54 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
|
||||
server.replstate = REDIS_REPL_CONNECTED;
|
||||
redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Finished with success");
|
||||
}
|
||||
|
||||
return;
|
||||
|
||||
error:
|
||||
replicationAbortSyncTransfer();
|
||||
return;
|
||||
}
|
||||
|
||||
int syncWithMaster(void) {
|
||||
char buf[1024], tmpfile[256], authcmd[1024];
|
||||
int fd = anetTcpConnect(NULL,server.masterhost,server.masterport);
|
||||
void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
|
||||
char buf[1024], tmpfile[256];
|
||||
int dfd, maxtries = 5;
|
||||
REDIS_NOTUSED(el);
|
||||
REDIS_NOTUSED(privdata);
|
||||
REDIS_NOTUSED(mask);
|
||||
|
||||
if (fd == -1) {
|
||||
redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",
|
||||
strerror(errno));
|
||||
return REDIS_ERR;
|
||||
}
|
||||
/* This event should only be triggered once since it is used to have a
|
||||
* non-blocking connect(2) to the master. It has been triggered when this
|
||||
* function is called, so we can delete it. */
|
||||
aeDeleteFileEvent(server.el,fd,AE_WRITABLE);
|
||||
|
||||
/* AUTH with the master if required. */
|
||||
if(server.masterauth) {
|
||||
snprintf(authcmd, 1024, "AUTH %s\r\n", server.masterauth);
|
||||
if (syncWrite(fd, authcmd, strlen(server.masterauth)+7, 5) == -1) {
|
||||
close(fd);
|
||||
char authcmd[1024];
|
||||
size_t authlen;
|
||||
|
||||
authlen = snprintf(authcmd,sizeof(authcmd),"AUTH %s\r\n",server.masterauth);
|
||||
if (syncWrite(fd,authcmd,authlen,server.repl_syncio_timeout) == -1) {
|
||||
redisLog(REDIS_WARNING,"Unable to AUTH to MASTER: %s",
|
||||
strerror(errno));
|
||||
return REDIS_ERR;
|
||||
}
|
||||
goto error;
|
||||
}
|
||||
/* Read the AUTH result. */
|
||||
if (syncReadLine(fd,buf,1024,3600) == -1) {
|
||||
close(fd);
|
||||
if (syncReadLine(fd,buf,1024,server.repl_syncio_timeout) == -1) {
|
||||
redisLog(REDIS_WARNING,"I/O error reading auth result from MASTER: %s",
|
||||
strerror(errno));
|
||||
return REDIS_ERR;
|
||||
goto error;
|
||||
}
|
||||
if (buf[0] != '+') {
|
||||
close(fd);
|
||||
redisLog(REDIS_WARNING,"Cannot AUTH to MASTER, is the masterauth password correct?");
|
||||
return REDIS_ERR;
|
||||
goto error;
|
||||
}
|
||||
}
|
||||
|
||||
/* Issue the SYNC command */
|
||||
if (syncWrite(fd,"SYNC \r\n",7,5) == -1) {
|
||||
close(fd);
|
||||
if (syncWrite(fd,"SYNC \r\n",7,server.repl_syncio_timeout) == -1) {
|
||||
redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
|
||||
strerror(errno));
|
||||
return REDIS_ERR;
|
||||
goto error;
|
||||
}
|
||||
|
||||
/* Prepare a suitable temp file for bulk transfer */
|
||||
@ -444,25 +418,51 @@ int syncWithMaster(void) {
|
||||
sleep(1);
|
||||
}
|
||||
if (dfd == -1) {
|
||||
close(fd);
|
||||
redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
|
||||
return REDIS_ERR;
|
||||
goto error;
|
||||
}
|
||||
|
||||
/* Setup the non blocking download of the bulk file. */
|
||||
if (aeCreateFileEvent(server.el, fd, AE_READABLE, readSyncBulkPayload, NULL)
|
||||
if (aeCreateFileEvent(server.el,fd, AE_READABLE,readSyncBulkPayload,NULL)
|
||||
== AE_ERR)
|
||||
{
|
||||
redisLog(REDIS_WARNING,"Can't create readable event for SYNC");
|
||||
goto error;
|
||||
}
|
||||
|
||||
server.replstate = REDIS_REPL_TRANSFER;
|
||||
server.repl_transfer_left = -1;
|
||||
server.repl_transfer_fd = dfd;
|
||||
server.repl_transfer_lastio = time(NULL);
|
||||
server.repl_transfer_tmpfile = zstrdup(tmpfile);
|
||||
return;
|
||||
|
||||
error:
|
||||
server.replstate = REDIS_REPL_CONNECT;
|
||||
close(fd);
|
||||
return;
|
||||
}
|
||||
|
||||
int connectWithMaster(void) {
|
||||
int fd;
|
||||
|
||||
fd = anetTcpNonBlockConnect(NULL,server.masterhost,server.masterport);
|
||||
if (fd == -1) {
|
||||
redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",
|
||||
strerror(errno));
|
||||
return REDIS_ERR;
|
||||
}
|
||||
|
||||
if (aeCreateFileEvent(server.el,fd,AE_WRITABLE,syncWithMaster,NULL) ==
|
||||
AE_ERR)
|
||||
{
|
||||
close(fd);
|
||||
redisLog(REDIS_WARNING,"Can't create readable event for SYNC");
|
||||
return REDIS_ERR;
|
||||
}
|
||||
server.replstate = REDIS_REPL_TRANSFER;
|
||||
server.repl_transfer_left = -1;
|
||||
|
||||
server.repl_transfer_s = fd;
|
||||
server.repl_transfer_fd = dfd;
|
||||
server.repl_transfer_lastio = time(NULL);
|
||||
server.repl_transfer_tmpfile = zstrdup(tmpfile);
|
||||
server.replstate = REDIS_REPL_CONNECTING;
|
||||
return REDIS_OK;
|
||||
}
|
||||
|
||||
@ -517,8 +517,8 @@ void replicationCron(void) {
|
||||
/* Check if we should connect to a MASTER */
|
||||
if (server.replstate == REDIS_REPL_CONNECT) {
|
||||
redisLog(REDIS_NOTICE,"Connecting to MASTER...");
|
||||
if (syncWithMaster() == REDIS_OK) {
|
||||
redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync started: SYNC sent");
|
||||
if (connectWithMaster() == REDIS_OK) {
|
||||
redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync started");
|
||||
if (server.appendonly) rewriteAppendOnlyFileBackground();
|
||||
}
|
||||
}
|
||||
|
@ -249,8 +249,11 @@ void sremCommand(redisClient *c) {
|
||||
|
||||
for (j = 2; j < c->argc; j++) {
|
||||
if (setTypeRemove(set,c->argv[j])) {
|
||||
if (setTypeSize(set) == 0) dbDelete(c->db,c->argv[1]);
|
||||
deleted++;
|
||||
if (setTypeSize(set) == 0) {
|
||||
dbDelete(c->db,c->argv[1]);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (deleted) {
|
||||
|
233
src/t_zset.c
233
src/t_zset.c
@ -810,11 +810,29 @@ void zaddGenericCommand(redisClient *c, int incr) {
|
||||
robj *ele;
|
||||
robj *zobj;
|
||||
robj *curobj;
|
||||
double score, curscore = 0.0;
|
||||
double score = 0, *scores, curscore = 0.0;
|
||||
int j, elements = (c->argc-2)/2;
|
||||
int added = 0;
|
||||
|
||||
if (getDoubleFromObjectOrReply(c,c->argv[2],&score,NULL) != REDIS_OK)
|
||||
if (c->argc % 2) {
|
||||
addReply(c,shared.syntaxerr);
|
||||
return;
|
||||
}
|
||||
|
||||
/* Start parsing all the scores, we need to emit any syntax error
|
||||
* before executing additions to the sorted set, as the command should
|
||||
* either execute fully or nothing at all. */
|
||||
scores = zmalloc(sizeof(double)*elements);
|
||||
for (j = 0; j < elements; j++) {
|
||||
if (getDoubleFromObjectOrReply(c,c->argv[2+j*2],&scores[j],NULL)
|
||||
!= REDIS_OK)
|
||||
{
|
||||
zfree(scores);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
/* Lookup the key and create the sorted set if does not exist. */
|
||||
zobj = lookupKeyWrite(c->db,key);
|
||||
if (zobj == NULL) {
|
||||
if (server.zset_max_ziplist_entries == 0 ||
|
||||
@ -828,111 +846,105 @@ void zaddGenericCommand(redisClient *c, int incr) {
|
||||
} else {
|
||||
if (zobj->type != REDIS_ZSET) {
|
||||
addReply(c,shared.wrongtypeerr);
|
||||
zfree(scores);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
|
||||
unsigned char *eptr;
|
||||
for (j = 0; j < elements; j++) {
|
||||
score = scores[j];
|
||||
|
||||
/* Prefer non-encoded element when dealing with ziplists. */
|
||||
ele = c->argv[3];
|
||||
if ((eptr = zzlFind(zobj->ptr,ele,&curscore)) != NULL) {
|
||||
if (incr) {
|
||||
score += curscore;
|
||||
if (isnan(score)) {
|
||||
addReplyError(c,nanerr);
|
||||
/* Don't need to check if the sorted set is empty, because
|
||||
* we know it has at least one element. */
|
||||
return;
|
||||
if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
|
||||
unsigned char *eptr;
|
||||
|
||||
/* Prefer non-encoded element when dealing with ziplists. */
|
||||
ele = c->argv[3+j*2];
|
||||
if ((eptr = zzlFind(zobj->ptr,ele,&curscore)) != NULL) {
|
||||
if (incr) {
|
||||
score += curscore;
|
||||
if (isnan(score)) {
|
||||
addReplyError(c,nanerr);
|
||||
/* Don't need to check if the sorted set is empty
|
||||
* because we know it has at least one element. */
|
||||
zfree(scores);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* Remove and re-insert when score changed. */
|
||||
if (score != curscore) {
|
||||
zobj->ptr = zzlDelete(zobj->ptr,eptr);
|
||||
/* Remove and re-insert when score changed. */
|
||||
if (score != curscore) {
|
||||
zobj->ptr = zzlDelete(zobj->ptr,eptr);
|
||||
zobj->ptr = zzlInsert(zobj->ptr,ele,score);
|
||||
|
||||
signalModifiedKey(c->db,key);
|
||||
server.dirty++;
|
||||
}
|
||||
} else {
|
||||
/* Optimize: check if the element is too large or the list
|
||||
* becomes too long *before* executing zzlInsert. */
|
||||
zobj->ptr = zzlInsert(zobj->ptr,ele,score);
|
||||
if (zzlLength(zobj->ptr) > server.zset_max_ziplist_entries)
|
||||
zsetConvert(zobj,REDIS_ENCODING_SKIPLIST);
|
||||
if (sdslen(ele->ptr) > server.zset_max_ziplist_value)
|
||||
zsetConvert(zobj,REDIS_ENCODING_SKIPLIST);
|
||||
|
||||
signalModifiedKey(c->db,key);
|
||||
server.dirty++;
|
||||
if (!incr) added++;
|
||||
}
|
||||
} else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
|
||||
zset *zs = zobj->ptr;
|
||||
zskiplistNode *znode;
|
||||
dictEntry *de;
|
||||
|
||||
if (incr) /* ZINCRBY */
|
||||
addReplyDouble(c,score);
|
||||
else /* ZADD */
|
||||
addReply(c,shared.czero);
|
||||
} else {
|
||||
/* Optimize: check if the element is too large or the list becomes
|
||||
* too long *before* executing zzlInsert. */
|
||||
zobj->ptr = zzlInsert(zobj->ptr,ele,score);
|
||||
if (zzlLength(zobj->ptr) > server.zset_max_ziplist_entries)
|
||||
zsetConvert(zobj,REDIS_ENCODING_SKIPLIST);
|
||||
if (sdslen(ele->ptr) > server.zset_max_ziplist_value)
|
||||
zsetConvert(zobj,REDIS_ENCODING_SKIPLIST);
|
||||
ele = c->argv[3+j*2] = tryObjectEncoding(c->argv[3+j*2]);
|
||||
de = dictFind(zs->dict,ele);
|
||||
if (de != NULL) {
|
||||
curobj = dictGetEntryKey(de);
|
||||
curscore = *(double*)dictGetEntryVal(de);
|
||||
|
||||
signalModifiedKey(c->db,key);
|
||||
server.dirty++;
|
||||
|
||||
if (incr) /* ZINCRBY */
|
||||
addReplyDouble(c,score);
|
||||
else /* ZADD */
|
||||
addReply(c,shared.cone);
|
||||
}
|
||||
} else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
|
||||
zset *zs = zobj->ptr;
|
||||
zskiplistNode *znode;
|
||||
dictEntry *de;
|
||||
|
||||
ele = c->argv[3] = tryObjectEncoding(c->argv[3]);
|
||||
de = dictFind(zs->dict,ele);
|
||||
if (de != NULL) {
|
||||
curobj = dictGetEntryKey(de);
|
||||
curscore = *(double*)dictGetEntryVal(de);
|
||||
|
||||
if (incr) {
|
||||
score += curscore;
|
||||
if (isnan(score)) {
|
||||
addReplyError(c,nanerr);
|
||||
/* Don't need to check if the sorted set is empty, because
|
||||
* we know it has at least one element. */
|
||||
return;
|
||||
if (incr) {
|
||||
score += curscore;
|
||||
if (isnan(score)) {
|
||||
addReplyError(c,nanerr);
|
||||
/* Don't need to check if the sorted set is empty
|
||||
* because we know it has at least one element. */
|
||||
zfree(scores);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* Remove and re-insert when score changed. We can safely delete
|
||||
* the key object from the skiplist, since the dictionary still has
|
||||
* a reference to it. */
|
||||
if (score != curscore) {
|
||||
redisAssert(zslDelete(zs->zsl,curscore,curobj));
|
||||
znode = zslInsert(zs->zsl,score,curobj);
|
||||
incrRefCount(curobj); /* Re-inserted in skiplist. */
|
||||
dictGetEntryVal(de) = &znode->score; /* Update score ptr. */
|
||||
/* Remove and re-insert when score changed. We can safely
|
||||
* delete the key object from the skiplist, since the
|
||||
* dictionary still has a reference to it. */
|
||||
if (score != curscore) {
|
||||
redisAssert(zslDelete(zs->zsl,curscore,curobj));
|
||||
znode = zslInsert(zs->zsl,score,curobj);
|
||||
incrRefCount(curobj); /* Re-inserted in skiplist. */
|
||||
dictGetEntryVal(de) = &znode->score; /* Update score ptr. */
|
||||
|
||||
signalModifiedKey(c->db,key);
|
||||
server.dirty++;
|
||||
}
|
||||
} else {
|
||||
znode = zslInsert(zs->zsl,score,ele);
|
||||
incrRefCount(ele); /* Inserted in skiplist. */
|
||||
redisAssert(dictAdd(zs->dict,ele,&znode->score) == DICT_OK);
|
||||
incrRefCount(ele); /* Added to dictionary. */
|
||||
|
||||
signalModifiedKey(c->db,key);
|
||||
server.dirty++;
|
||||
if (!incr) added++;
|
||||
}
|
||||
|
||||
if (incr) /* ZINCRBY */
|
||||
addReplyDouble(c,score);
|
||||
else /* ZADD */
|
||||
addReply(c,shared.czero);
|
||||
} else {
|
||||
znode = zslInsert(zs->zsl,score,ele);
|
||||
incrRefCount(ele); /* Inserted in skiplist. */
|
||||
redisAssert(dictAdd(zs->dict,ele,&znode->score) == DICT_OK);
|
||||
incrRefCount(ele); /* Added to dictionary. */
|
||||
|
||||
signalModifiedKey(c->db,key);
|
||||
server.dirty++;
|
||||
|
||||
if (incr) /* ZINCRBY */
|
||||
addReplyDouble(c,score);
|
||||
else /* ZADD */
|
||||
addReply(c,shared.cone);
|
||||
redisPanic("Unknown sorted set encoding");
|
||||
}
|
||||
} else {
|
||||
redisPanic("Unknown sorted set encoding");
|
||||
}
|
||||
zfree(scores);
|
||||
if (incr) /* ZINCRBY */
|
||||
addReplyDouble(c,score);
|
||||
else /* ZADD */
|
||||
addReplyLongLong(c,added);
|
||||
}
|
||||
|
||||
void zaddCommand(redisClient *c) {
|
||||
@ -945,8 +957,8 @@ void zincrbyCommand(redisClient *c) {
|
||||
|
||||
void zremCommand(redisClient *c) {
|
||||
robj *key = c->argv[1];
|
||||
robj *ele = c->argv[2];
|
||||
robj *zobj;
|
||||
int deleted = 0, j;
|
||||
|
||||
if ((zobj = lookupKeyWriteOrReply(c,key,shared.czero)) == NULL ||
|
||||
checkType(c,zobj,REDIS_ZSET)) return;
|
||||
@ -954,39 +966,48 @@ void zremCommand(redisClient *c) {
|
||||
if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
|
||||
unsigned char *eptr;
|
||||
|
||||
if ((eptr = zzlFind(zobj->ptr,ele,NULL)) != NULL) {
|
||||
zobj->ptr = zzlDelete(zobj->ptr,eptr);
|
||||
if (zzlLength(zobj->ptr) == 0) dbDelete(c->db,key);
|
||||
} else {
|
||||
addReply(c,shared.czero);
|
||||
return;
|
||||
for (j = 2; j < c->argc; j++) {
|
||||
if ((eptr = zzlFind(zobj->ptr,c->argv[j],NULL)) != NULL) {
|
||||
deleted++;
|
||||
zobj->ptr = zzlDelete(zobj->ptr,eptr);
|
||||
if (zzlLength(zobj->ptr) == 0) {
|
||||
dbDelete(c->db,key);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
} else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
|
||||
zset *zs = zobj->ptr;
|
||||
dictEntry *de;
|
||||
double score;
|
||||
|
||||
de = dictFind(zs->dict,ele);
|
||||
if (de != NULL) {
|
||||
/* Delete from the skiplist */
|
||||
score = *(double*)dictGetEntryVal(de);
|
||||
redisAssert(zslDelete(zs->zsl,score,ele));
|
||||
for (j = 2; j < c->argc; j++) {
|
||||
de = dictFind(zs->dict,c->argv[j]);
|
||||
if (de != NULL) {
|
||||
deleted++;
|
||||
|
||||
/* Delete from the hash table */
|
||||
dictDelete(zs->dict,ele);
|
||||
if (htNeedsResize(zs->dict)) dictResize(zs->dict);
|
||||
if (dictSize(zs->dict) == 0) dbDelete(c->db,key);
|
||||
} else {
|
||||
addReply(c,shared.czero);
|
||||
return;
|
||||
/* Delete from the skiplist */
|
||||
score = *(double*)dictGetEntryVal(de);
|
||||
redisAssert(zslDelete(zs->zsl,score,c->argv[j]));
|
||||
|
||||
/* Delete from the hash table */
|
||||
dictDelete(zs->dict,c->argv[j]);
|
||||
if (htNeedsResize(zs->dict)) dictResize(zs->dict);
|
||||
if (dictSize(zs->dict) == 0) {
|
||||
dbDelete(c->db,key);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
redisPanic("Unknown sorted set encoding");
|
||||
}
|
||||
|
||||
signalModifiedKey(c->db,key);
|
||||
server.dirty++;
|
||||
addReply(c,shared.cone);
|
||||
if (deleted) {
|
||||
signalModifiedKey(c->db,key);
|
||||
server.dirty += deleted;
|
||||
}
|
||||
addReplyLongLong(c,deleted);
|
||||
}
|
||||
|
||||
void zremrangebyscoreCommand(redisClient *c) {
|
||||
|
@ -110,6 +110,13 @@ proc cleanup {} {
|
||||
}
|
||||
|
||||
proc execute_everything {} {
|
||||
if 0 {
|
||||
# Use this when hacking on new tests.
|
||||
set ::verbose 1
|
||||
execute_tests "unit/first"
|
||||
return
|
||||
}
|
||||
|
||||
execute_tests "unit/printver"
|
||||
execute_tests "unit/auth"
|
||||
execute_tests "unit/protocol"
|
||||
|
@ -105,6 +105,12 @@ start_server {
|
||||
lsort [r smembers myset]
|
||||
} {a c}
|
||||
|
||||
test {SREM variadic version with more args needed to destroy the key} {
|
||||
r del myset
|
||||
r sadd myset 1 2 3
|
||||
r srem myset 1 2 3 4 5 6 7 8
|
||||
} {3}
|
||||
|
||||
foreach {type} {hashtable intset} {
|
||||
for {set i 1} {$i <= 5} {incr i} {
|
||||
r del [format "set%d" $i]
|
||||
|
@ -48,6 +48,34 @@ start_server {tags {"zset"}} {
|
||||
assert_error "*NaN*" {r zincrby myzset -inf abc}
|
||||
}
|
||||
|
||||
test {ZADD - Variadic version base case} {
|
||||
r del myzset
|
||||
list [r zadd myzset 10 a 20 b 30 c] [r zrange myzset 0 -1 withscores]
|
||||
} {3 {a 10 b 20 c 30}}
|
||||
|
||||
test {ZADD - Return value is the number of actually added items} {
|
||||
list [r zadd myzset 5 x 20 b 30 c] [r zrange myzset 0 -1 withscores]
|
||||
} {1 {x 5 a 10 b 20 c 30}}
|
||||
|
||||
test {ZADD - Variadic version does not add nothing on single parsing err} {
|
||||
r del myzset
|
||||
catch {r zadd myzset 10 a 20 b 30.badscore c} e
|
||||
assert_match {*ERR*not*double*} $e
|
||||
r exists myzset
|
||||
} {0}
|
||||
|
||||
test {ZADD - Variadic version will raise error on missing arg} {
|
||||
r del myzset
|
||||
catch {r zadd myzset 10 a 20 b 30 c 40} e
|
||||
assert_match {*ERR*syntax*} $e
|
||||
}
|
||||
|
||||
test {ZINCRBY does not work variadic even if shares ZADD implementation} {
|
||||
r del myzset
|
||||
catch {r zincrby myzset 10 a 20 b 30 c} e
|
||||
assert_match {*ERR*wrong*number*arg*} $e
|
||||
}
|
||||
|
||||
test "ZCARD basics - $encoding" {
|
||||
assert_equal 3 [r zcard ztmp]
|
||||
assert_equal 0 [r zcard zdoesntexist]
|
||||
@ -65,6 +93,21 @@ start_server {tags {"zset"}} {
|
||||
assert_equal 0 [r exists ztmp]
|
||||
}
|
||||
|
||||
test "ZREM variadic version" {
|
||||
r del ztmp
|
||||
r zadd ztmp 10 a 20 b 30 c
|
||||
assert_equal 2 [r zrem ztmp x y a b k]
|
||||
assert_equal 0 [r zrem ztmp foo bar]
|
||||
assert_equal 1 [r zrem ztmp c]
|
||||
r exists ztmp
|
||||
} {0}
|
||||
|
||||
test "ZREM variadic version -- remove elements after key deletion" {
|
||||
r del ztmp
|
||||
r zadd ztmp 10 a 20 b 30 c
|
||||
r zrem ztmp a b c d e f g
|
||||
} {3}
|
||||
|
||||
test "ZRANGE basics - $encoding" {
|
||||
r del ztmp
|
||||
r zadd ztmp 1 a
|
||||
|
Loading…
x
Reference in New Issue
Block a user