diff --git a/src/cluster.c b/src/cluster.c index 93e19920..fd295df3 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -4745,16 +4745,22 @@ try_again: /* Read the RESTORE replies. */ int error_from_target = 0; + int del_idx = 1; /* Index of the key argument for the replicated DEL op. */ + robj **newargv; + + if (!copy) newargv = zmalloc(sizeof(robj*)*(num_keys+1)); + for (j = 0; j < num_keys; j++) { if (syncReadLine(cs->fd, buf2, sizeof(buf2), timeout) <= 0) goto socket_err; - if (error_from_target) continue; /* Just consume the replies. */ if ((select && buf1[0] == '-') || buf2[0] == '-') { /* On error assume that last_dbid is no longer valid. */ - cs->last_dbid = -1; - addReplyErrorFormat(c,"Target instance replied with error: %s", - (select && buf1[0] == '-') ? buf1+1 : buf2+1); - error_from_target = 1; + if (!error_from_target) { + cs->last_dbid = -1; + addReplyErrorFormat(c,"Target instance replied with error: %s", + (select && buf1[0] == '-') ? buf1+1 : buf2+1); + error_from_target = 1; + } } else { if (!copy) { robj *aux; @@ -4764,17 +4770,23 @@ try_again: signalModifiedKey(c->db,kv[j]); server.dirty++; - /* Translate MIGRATE as DEL for replication/AOF. */ - if (j == 0) { - aux = createStringObject("DEL",3); - rewriteClientCommandArgument(c,0,aux); - decrRefCount(aux); - } - rewriteClientCommandArgument(c,j+1,kv[j]); + /* Populate the argument vector to replace the old one. */ + newargv[del_idx++] = kv[j]; } } } + if (!copy) { + /* Translate MIGRATE as DEL for replication/AOF. */ + if (del_idx > 1) { + newargv[0] = createStringObject("DEL",3); + replaceClientCommandVector(c,newargv,del_idx); + } else { + /* No key transfer acknowledged, no need to rewrite as DEL. */ + zfree(newargv); + } + } + if (!error_from_target) { /* Update the last_dbid in migrateCachedSocket and reply +OK. */ cs->last_dbid = dbid; diff --git a/src/networking.c b/src/networking.c index ef0ec732..047e8d22 100644 --- a/src/networking.c +++ b/src/networking.c @@ -1597,15 +1597,28 @@ void replaceClientCommandVector(client *c, int argc, robj **argv) { } /* Rewrite a single item in the command vector. - * The new val ref count is incremented, and the old decremented. */ + * The new val ref count is incremented, and the old decremented. + * + * It is possible to specify an argument over the current size of the + * argument vector: in this case the array of objects gets reallocated + * and c->argc set to the max value. However it's up to the caller to + * + * 1. Make sure there are no "holes" and all the arguments are set. + * 2. If the original argument vector was longer than the one we + * want to end with, it's up to the caller to set c->argc and + * free the no longer used objects on c->argv. */ void rewriteClientCommandArgument(client *c, int i, robj *newval) { robj *oldval; - if (i >= c->argc) c->argv = zrealloc(c->argv,sizeof(robj*)*(i+1)); + if (i >= c->argc) { + c->argv = zrealloc(c->argv,sizeof(robj*)*(i+1)); + c->argc = i+1; + c->argv[i] = NULL; + } oldval = c->argv[i]; c->argv[i] = newval; incrRefCount(newval); - decrRefCount(oldval); + if (oldval) decrRefCount(oldval); /* If this is the command name make sure to fix c->cmd. */ if (i == 0) {