migrate: fix mismatch of RESTORE reply when some keys have expired.

This commit is contained in:
youjiali1995 2018-10-18 18:57:51 +08:00
parent 144832ee67
commit a6499ecac2

View File

@ -5164,10 +5164,10 @@ try_again:
serverAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,dbid)); serverAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,dbid));
} }
int expired = 0; /* Number of keys that we'll find already expired. int non_expired = 0; /* Number of keys that we'll find non expired.
Note that serializing large keys may take some time Note that serializing large keys may take some time
so certain keys that were found non expired by the so certain keys that were found non expired by the
lookupKey() function, may be expired later. */ lookupKey() function, may be expired later. */
/* Create RESTORE payload and generate the protocol to call the command. */ /* Create RESTORE payload and generate the protocol to call the command. */
for (j = 0; j < num_keys; j++) { for (j = 0; j < num_keys; j++) {
@ -5177,11 +5177,12 @@ try_again:
if (expireat != -1) { if (expireat != -1) {
ttl = expireat-mstime(); ttl = expireat-mstime();
if (ttl < 0) { if (ttl < 0) {
expired++;
continue; continue;
} }
if (ttl < 1) ttl = 1; if (ttl < 1) ttl = 1;
} }
kv[non_expired++] = kv[j];
serverAssertWithInfo(c,NULL, serverAssertWithInfo(c,NULL,
rioWriteBulkCount(&cmd,'*',replace ? 5 : 4)); rioWriteBulkCount(&cmd,'*',replace ? 5 : 4));
@ -5208,6 +5209,7 @@ try_again:
if (replace) if (replace)
serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"REPLACE",7)); serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"REPLACE",7));
} }
num_keys = non_expired;
/* Transfer the query to the other node in 64K chunks. */ /* Transfer the query to the other node in 64K chunks. */
errno = 0; errno = 0;
@ -5250,7 +5252,7 @@ try_again:
* command name itself. */ * command name itself. */
if (!copy) newargv = zmalloc(sizeof(robj*)*(num_keys+1)); if (!copy) newargv = zmalloc(sizeof(robj*)*(num_keys+1));
for (j = 0; j < num_keys-expired; j++) { for (j = 0; j < num_keys; j++) {
if (syncReadLine(cs->fd, buf2, sizeof(buf2), timeout) <= 0) { if (syncReadLine(cs->fd, buf2, sizeof(buf2), timeout) <= 0) {
socket_error = 1; socket_error = 1;
break; break;