Pipelined multiple keys MIGRATE.

This commit is contained in:
antirez 2015-12-11 11:56:45 +01:00
parent e7945cf839
commit 9ebf7a6776
2 changed files with 115 additions and 63 deletions

View File

@ -4592,18 +4592,27 @@ void migrateCloseTimedoutSockets(void) {
dictReleaseIterator(di); dictReleaseIterator(di);
} }
/* MIGRATE host port key dbid timeout [COPY | REPLACE] */ /* MIGRATE host port key dbid timeout [COPY | REPLACE]
*
* On in the multiple keys form:
*
* MIGRATE host port "" dbid timeout [COPY | REPLACE] KEYS key1 key2 ... keyN */
void migrateCommand(client *c) { void migrateCommand(client *c) {
migrateCachedSocket *cs; migrateCachedSocket *cs;
int copy, replace, j; int copy, replace, j;
long timeout; long timeout;
long dbid; long dbid;
long long ttl, expireat; long long ttl, expireat;
robj *o; robj **ov = zmalloc(sizeof(robj*)); /* Objects to migrate. */
robj **kv = zmalloc(sizeof(robj*)); /* Key names. */
rio cmd, payload; rio cmd, payload;
int retry_num = 0; int retry_num = 0;
int write_error = 0;
/* To support the KEYS option we need the following additional state. */
int first_key = 3; /* Argument index of the first key. */
int num_keys = 1; /* By default only migrate the 'key' argument. */
try_again:
/* Initialization */ /* Initialization */
copy = 0; copy = 0;
replace = 0; replace = 0;
@ -4615,6 +4624,19 @@ try_again:
copy = 1; copy = 1;
} else if (!strcasecmp(c->argv[j]->ptr,"replace")) { } else if (!strcasecmp(c->argv[j]->ptr,"replace")) {
replace = 1; replace = 1;
} else if (!strcasecmp(c->argv[j]->ptr,"keys")) {
if (sdslen(c->argv[3]->ptr) != 0) {
addReplyError(c,
"When using MIGRATE KEYS option, the key argument"
" must be set to the empty string");
zfree(ov); zfree(kv);
return;
}
first_key = j+1;
num_keys = c->argc - j - 1;
ov = zrealloc(ov,sizeof(robj*)*num_keys);
kv = zrealloc(kv,sizeof(robj*)*num_keys);
break; /* All the remaining args are keys. */
} else { } else {
addReply(c,shared.syntaxerr); addReply(c,shared.syntaxerr);
return; return;
@ -4628,14 +4650,28 @@ try_again:
return; return;
if (timeout <= 0) timeout = 1000; if (timeout <= 0) timeout = 1000;
/* Check if the key is here. If not we reply with success as there is /* Check if the keys are here. If at least one key is to migrate, do it
* nothing to migrate (for instance the key expired in the meantime), but * otherwise if all the keys are missing reply with "NOKEY" to signal
* we include such information in the reply string. */ * the caller there was nothing to migrate. We don't return an error in
if ((o = lookupKeyRead(c->db,c->argv[3])) == NULL) { * this case, since often this is due to a normal condition like the key
* expiring in the meantime. */
int oi = 0;
for (j = 0; j < num_keys; j++) {
if ((ov[oi] = lookupKeyRead(c->db,c->argv[first_key+j])) != NULL) {
kv[oi] = c->argv[first_key+j];
oi++;
}
}
num_keys = oi;
if (num_keys == 0) {
zfree(ov); zfree(kv);
addReplySds(c,sdsnew("+NOKEY\r\n")); addReplySds(c,sdsnew("+NOKEY\r\n"));
return; return;
} }
try_again:
write_error = 0;
/* Connect */ /* Connect */
cs = migrateGetSocket(c,c->argv[1],c->argv[2],timeout); cs = migrateGetSocket(c,c->argv[1],c->argv[2],timeout);
if (cs == NULL) return; /* error sent to the client by migrateGetSocket() */ if (cs == NULL) return; /* error sent to the client by migrateGetSocket() */
@ -4651,7 +4687,8 @@ try_again:
} }
/* Create RESTORE payload and generate the protocol to call the command. */ /* Create RESTORE payload and generate the protocol to call the command. */
expireat = getExpire(c->db,c->argv[3]); for (j = 0; j < num_keys; j++) {
expireat = getExpire(c->db,kv[j]);
if (expireat != -1) { if (expireat != -1) {
ttl = expireat-mstime(); ttl = expireat-mstime();
if (ttl < 1) ttl = 1; if (ttl < 1) ttl = 1;
@ -4662,15 +4699,16 @@ try_again:
rioWriteBulkString(&cmd,"RESTORE-ASKING",14)); rioWriteBulkString(&cmd,"RESTORE-ASKING",14));
else else
serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"RESTORE",7)); serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"RESTORE",7));
serverAssertWithInfo(c,NULL,sdsEncodedObject(c->argv[3])); serverAssertWithInfo(c,NULL,sdsEncodedObject(kv[j]));
serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,c->argv[3]->ptr, serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,kv[j]->ptr,
sdslen(c->argv[3]->ptr))); sdslen(kv[j]->ptr)));
serverAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,ttl)); serverAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,ttl));
/* Emit the payload argument, that is the serialized object using /* Emit the payload argument, that is the serialized object using
* the DUMP format. */ * the DUMP format. */
createDumpPayload(&payload,o); createDumpPayload(&payload,ov[j]);
serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,payload.io.buffer.ptr, serverAssertWithInfo(c,NULL,
rioWriteBulkString(&cmd,payload.io.buffer.ptr,
sdslen(payload.io.buffer.ptr))); sdslen(payload.io.buffer.ptr)));
sdsfree(payload.io.buffer.ptr); sdsfree(payload.io.buffer.ptr);
@ -4678,6 +4716,7 @@ try_again:
* as a MIGRATE option. */ * as a MIGRATE option. */
if (replace) if (replace)
serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"REPLACE",7)); serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"REPLACE",7));
}
/* Transfer the query to the other node in 64K chunks. */ /* Transfer the query to the other node in 64K chunks. */
errno = 0; errno = 0;
@ -4689,64 +4728,77 @@ try_again:
while ((towrite = sdslen(buf)-pos) > 0) { while ((towrite = sdslen(buf)-pos) > 0) {
towrite = (towrite > (64*1024) ? (64*1024) : towrite); towrite = (towrite > (64*1024) ? (64*1024) : towrite);
nwritten = syncWrite(cs->fd,buf+pos,towrite,timeout); nwritten = syncWrite(cs->fd,buf+pos,towrite,timeout);
if (nwritten != (signed)towrite) goto socket_wr_err; if (nwritten != (signed)towrite) {
write_error = 1;
goto socket_err;
}
pos += nwritten; pos += nwritten;
} }
} }
/* Read back the reply. */ char buf1[1024]; /* Select reply. */
{ char buf2[1024]; /* Restore reply. */
char buf1[1024];
char buf2[1024];
/* Read the two replies */ /* Read the SELECT reply if needed. */
if (select && syncReadLine(cs->fd, buf1, sizeof(buf1), timeout) <= 0) if (select && syncReadLine(cs->fd, buf1, sizeof(buf1), timeout) <= 0)
goto socket_rd_err; goto socket_err;
/* Read the RESTORE replies. */
int error_from_target = 0;
for (j = 0; j < num_keys; j++) {
if (syncReadLine(cs->fd, buf2, sizeof(buf2), timeout) <= 0) if (syncReadLine(cs->fd, buf2, sizeof(buf2), timeout) <= 0)
goto socket_rd_err; goto socket_err;
if (error_from_target) continue; /* Just consume the replies. */
if ((select && buf1[0] == '-') || buf2[0] == '-') { if ((select && buf1[0] == '-') || buf2[0] == '-') {
/* On error assume that last_dbid is no longer valid. */ /* On error assume that last_dbid is no longer valid. */
cs->last_dbid = -1; cs->last_dbid = -1;
addReplyErrorFormat(c,"Target instance replied with error: %s", addReplyErrorFormat(c,"Target instance replied with error: %s",
(select && buf1[0] == '-') ? buf1+1 : buf2+1); (select && buf1[0] == '-') ? buf1+1 : buf2+1);
error_from_target = 1;
} else { } else {
/* Update the last_dbid in migrateCachedSocket */ if (!copy) {
cs->last_dbid = dbid;
robj *aux; robj *aux;
addReply(c,shared.ok);
if (!copy) {
/* No COPY option: remove the local key, signal the change. */ /* No COPY option: remove the local key, signal the change. */
dbDelete(c->db,c->argv[3]); dbDelete(c->db,kv[j]);
signalModifiedKey(c->db,c->argv[3]); signalModifiedKey(c->db,kv[j]);
server.dirty++; server.dirty++;
/* Translate MIGRATE as DEL for replication/AOF. */ /* Translate MIGRATE as DEL for replication/AOF. */
if (j == 0) {
aux = createStringObject("DEL",3); aux = createStringObject("DEL",3);
rewriteClientCommandVector(c,2,aux,c->argv[3]); rewriteClientCommandArgument(c,0,aux);
decrRefCount(aux); decrRefCount(aux);
} }
rewriteClientCommandArgument(c,j+1,kv[j]);
}
} }
} }
if (!error_from_target) {
/* Update the last_dbid in migrateCachedSocket and reply +OK. */
cs->last_dbid = dbid;
addReply(c,shared.ok);
} else {
/* On error we already sent it in the for loop above. */
}
sdsfree(cmd.io.buffer.ptr); sdsfree(cmd.io.buffer.ptr);
zfree(ov); zfree(kv);
return; return;
socket_wr_err: /* On socket errors we try to close the cached socket and try again.
* It is very common for the cached socket to get closed, if just reopening
* it works it's a shame to notify the error to the caller. */
socket_err:
sdsfree(cmd.io.buffer.ptr); sdsfree(cmd.io.buffer.ptr);
migrateCloseSocket(c->argv[1],c->argv[2]); migrateCloseSocket(c->argv[1],c->argv[2]);
if (errno != ETIMEDOUT && retry_num++ == 0) goto try_again; if (errno != ETIMEDOUT && retry_num++ == 0) goto try_again;
zfree(ov); zfree(kv);
addReplySds(c, addReplySds(c,
sdsnew("-IOERR error or timeout writing to target instance\r\n")); sdscatprintf(sdsempty(),
return; "-IOERR error or timeout %s to target instance\r\n",
write_error ? "writing" : "reading"));
socket_rd_err:
sdsfree(cmd.io.buffer.ptr);
migrateCloseSocket(c->argv[1],c->argv[2]);
if (errno != ETIMEDOUT && retry_num++ == 0) goto try_again;
addReplySds(c,
sdsnew("-IOERR error or timeout reading from target node\r\n"));
return; return;
} }

View File

@ -1601,7 +1601,7 @@ void replaceClientCommandVector(client *c, int argc, robj **argv) {
void rewriteClientCommandArgument(client *c, int i, robj *newval) { void rewriteClientCommandArgument(client *c, int i, robj *newval) {
robj *oldval; robj *oldval;
serverAssertWithInfo(c,NULL,i < c->argc); if (i >= c->argc) c->argv = zrealloc(c->argv,sizeof(robj*)*(i+1));
oldval = c->argv[i]; oldval = c->argv[i];
c->argv[i] = newval; c->argv[i] = newval;
incrRefCount(newval); incrRefCount(newval);