Rewrite MIGRATE AUTH option.

See PR #2507. This is a reimplementation of the fix that contained
different problems.
This commit is contained in:
antirez 2018-01-09 18:16:22 +01:00
parent e509fbb8d9
commit 3ce1c28d47

View File

@ -4896,14 +4896,16 @@ void migrateCloseTimedoutSockets(void) {
dictReleaseIterator(di);
}
/* MIGRATE host port key dbid timeout [COPY | REPLACE]
/* MIGRATE host port key dbid timeout [COPY | REPLACE | AUTH password]
*
* On in the multiple keys form:
*
* MIGRATE host port "" dbid timeout [COPY | REPLACE] KEYS key1 key2 ... keyN */
* MIGRATE host port "" dbid timeout [COPY | REPLACE | AUTH password] KEYS key1
* key2 ... keyN */
void migrateCommand(client *c) {
migrateCachedSocket *cs;
int copy, replace, j;
int copy = 0, replace = 0, j;
char *password = NULL;
long timeout;
long dbid;
robj **ov = NULL; /* Objects to migrate. */
@ -4918,16 +4920,20 @@ void migrateCommand(client *c) {
int first_key = 3; /* Argument index of the first key. */
int num_keys = 1; /* By default only migrate the 'key' argument. */
/* Initialization */
copy = 0;
replace = 0;
/* Parse additional options */
for (j = 6; j < c->argc; j++) {
int moreargs = j < c->argc-1;
if (!strcasecmp(c->argv[j]->ptr,"copy")) {
copy = 1;
} else if (!strcasecmp(c->argv[j]->ptr,"replace")) {
replace = 1;
} else if (!strcasecmp(c->argv[j]->ptr,"auth")) {
if (!moreargs) {
addReply(c,shared.syntaxerr);
return;
}
j++;
password = c->argv[j]->ptr;
} else if (!strcasecmp(c->argv[j]->ptr,"keys")) {
if (sdslen(c->argv[3]->ptr) != 0) {
addReplyError(c,
@ -4986,6 +4992,14 @@ try_again:
rioInitWithBuffer(&cmd,sdsempty());
/* Authentication */
if (password) {
serverAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',2));
serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"AUTH",4));
serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,password,
sdslen(password)));
}
/* Send the SELECT command if the current DB is not already selected. */
int select = cs->last_dbid != dbid; /* Should we emit SELECT? */
if (select) {
@ -5003,7 +5017,9 @@ try_again:
ttl = expireat-mstime();
if (ttl < 1) ttl = 1;
}
serverAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',replace ? 5 : 4));
serverAssertWithInfo(c,NULL,
rioWriteBulkCount(&cmd,'*',replace ? 5 : 4));
if (server.cluster_enabled)
serverAssertWithInfo(c,NULL,
rioWriteBulkString(&cmd,"RESTORE-ASKING",14));
@ -5046,9 +5062,14 @@ try_again:
}
}
char buf0[1024]; /* Auth reply. */
char buf1[1024]; /* Select reply. */
char buf2[1024]; /* Restore reply. */
/* Read the AUTH reply if needed. */
if (password && syncReadLine(cs->fd, buf0, sizeof(buf0), timeout) <= 0)
goto socket_err;
/* Read the SELECT reply if needed. */
if (select && syncReadLine(cs->fd, buf1, sizeof(buf1), timeout) <= 0)
goto socket_err;
@ -5065,13 +5086,18 @@ try_again:
socket_error = 1;
break;
}
if ((select && buf1[0] == '-') || buf2[0] == '-') {
if (buf0[0] == '-' || (select && buf1[0] == '-') || buf2[0] == '-') {
/* On error assume that last_dbid is no longer valid. */
if (!error_from_target) {
cs->last_dbid = -1;
addReplyErrorFormat(c,"Target instance replied with error: %s",
(select && buf1[0] == '-') ? buf1+1 : buf2+1);
char *errbuf;
if (buf0[0] == '-') errbuf = buf0;
else if (select && buf1[0] == '-') errbuf = buf1;
else errbuf = buf2;
error_from_target = 1;
addReplyErrorFormat(c,"Target instance replied with error: %s",
errbuf+1);
}
} else {
if (!copy) {
@ -5136,7 +5162,7 @@ try_again:
addReply(c,shared.ok);
} else {
/* On error we already sent it in the for loop above, and set
* the curretly selected socket to -1 to force SELECT the next time. */
* the currently selected socket to -1 to force SELECT the next time. */
}
sdsfree(cmd.io.buffer.ptr);