mirror of
https://github.com/fluencelabs/redis
synced 2025-03-19 09:00:51 +00:00
Fix MIGRATE closing of cached socket on error.
After investigating issue #3796, it was discovered that MIGRATE could call migrateCloseSocket() after the original MIGRATE c->argv was already rewritten as a DEL operation. As a result the host/port passed to migrateCloseSocket() could be anything, often a NULL pointer that gets deferenced crashing the server. Now the socket is closed at an earlier time when there is a socket error in a later stage where no retry will be performed, before we rewrite the argument vector. Moreover a check was added so that later, in the socket_err label, there is no further attempt at closing the socket if the argument was rewritten. This fix should resolve the bug reported in #3796.
This commit is contained in:
parent
0dbfb1d154
commit
f917e0da4c
@ -4756,6 +4756,7 @@ void migrateCommand(client *c) {
|
|||||||
rio cmd, payload;
|
rio cmd, payload;
|
||||||
int may_retry = 1;
|
int may_retry = 1;
|
||||||
int write_error = 0;
|
int write_error = 0;
|
||||||
|
int argv_rewritten = 0;
|
||||||
|
|
||||||
/* To support the KEYS option we need the following additional state. */
|
/* To support the KEYS option we need the following additional state. */
|
||||||
int first_key = 3; /* Argument index of the first key. */
|
int first_key = 3; /* Argument index of the first key. */
|
||||||
@ -4939,12 +4940,20 @@ try_again:
|
|||||||
goto socket_err; /* A retry is guaranteed because of tested conditions.*/
|
goto socket_err; /* A retry is guaranteed because of tested conditions.*/
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* On socket errors, close the migration socket now that we still have
|
||||||
|
* the original host/port in the ARGV. Later the original command may be
|
||||||
|
* rewritten to DEL and will be too later. */
|
||||||
|
if (socket_error) migrateCloseSocket(c->argv[1],c->argv[2]);
|
||||||
|
|
||||||
if (!copy) {
|
if (!copy) {
|
||||||
/* Translate MIGRATE as DEL for replication/AOF. */
|
/* Translate MIGRATE as DEL for replication/AOF. Note that we do
|
||||||
|
* this only for the keys for which we received an acknowledgement
|
||||||
|
* from the receiving Redis server, by using the del_idx index. */
|
||||||
if (del_idx > 1) {
|
if (del_idx > 1) {
|
||||||
newargv[0] = createStringObject("DEL",3);
|
newargv[0] = createStringObject("DEL",3);
|
||||||
/* Note that the following call takes ownership of newargv. */
|
/* Note that the following call takes ownership of newargv. */
|
||||||
replaceClientCommandVector(c,del_idx,newargv);
|
replaceClientCommandVector(c,del_idx,newargv);
|
||||||
|
argv_rewritten = 1;
|
||||||
} else {
|
} else {
|
||||||
/* No key transfer acknowledged, no need to rewrite as DEL. */
|
/* No key transfer acknowledged, no need to rewrite as DEL. */
|
||||||
zfree(newargv);
|
zfree(newargv);
|
||||||
@ -4953,8 +4962,8 @@ try_again:
|
|||||||
}
|
}
|
||||||
|
|
||||||
/* If we are here and a socket error happened, we don't want to retry.
|
/* If we are here and a socket error happened, we don't want to retry.
|
||||||
* Just signal the problem to the client, but only do it if we don't
|
* Just signal the problem to the client, but only do it if we did not
|
||||||
* already queued a different error reported by the destination server. */
|
* already queue a different error reported by the destination server. */
|
||||||
if (!error_from_target && socket_error) {
|
if (!error_from_target && socket_error) {
|
||||||
may_retry = 0;
|
may_retry = 0;
|
||||||
goto socket_err;
|
goto socket_err;
|
||||||
@ -4962,7 +4971,11 @@ try_again:
|
|||||||
|
|
||||||
if (!error_from_target) {
|
if (!error_from_target) {
|
||||||
/* Success! Update the last_dbid in migrateCachedSocket, so that we can
|
/* Success! Update the last_dbid in migrateCachedSocket, so that we can
|
||||||
* avoid SELECT the next time if the target DB is the same. Reply +OK. */
|
* avoid SELECT the next time if the target DB is the same. Reply +OK.
|
||||||
|
*
|
||||||
|
* Note: If we reached this point, even if socket_error is true
|
||||||
|
* still the SELECT command succeeded (otherwise the code jumps to
|
||||||
|
* socket_err label. */
|
||||||
cs->last_dbid = dbid;
|
cs->last_dbid = dbid;
|
||||||
addReply(c,shared.ok);
|
addReply(c,shared.ok);
|
||||||
} else {
|
} else {
|
||||||
@ -4972,7 +4985,6 @@ try_again:
|
|||||||
|
|
||||||
sdsfree(cmd.io.buffer.ptr);
|
sdsfree(cmd.io.buffer.ptr);
|
||||||
zfree(ov); zfree(kv); zfree(newargv);
|
zfree(ov); zfree(kv); zfree(newargv);
|
||||||
if (socket_error) migrateCloseSocket(c->argv[1],c->argv[2]);
|
|
||||||
return;
|
return;
|
||||||
|
|
||||||
/* On socket errors we try to close the cached socket and try again.
|
/* On socket errors we try to close the cached socket and try again.
|
||||||
@ -4982,7 +4994,12 @@ socket_err:
|
|||||||
/* Cleanup we want to perform in both the retry and no retry case.
|
/* Cleanup we want to perform in both the retry and no retry case.
|
||||||
* Note: Closing the migrate socket will also force SELECT next time. */
|
* Note: Closing the migrate socket will also force SELECT next time. */
|
||||||
sdsfree(cmd.io.buffer.ptr);
|
sdsfree(cmd.io.buffer.ptr);
|
||||||
migrateCloseSocket(c->argv[1],c->argv[2]);
|
|
||||||
|
/* If the command was rewritten as DEL and there was a socket error,
|
||||||
|
* we already closed the socket earlier. While migrateCloseSocket()
|
||||||
|
* is idempotent, the host/port arguments are now gone, so don't do it
|
||||||
|
* again. */
|
||||||
|
if (!argv_rewritten) migrateCloseSocket(c->argv[1],c->argv[2]);
|
||||||
zfree(newargv);
|
zfree(newargv);
|
||||||
newargv = NULL; /* This will get reallocated on retry. */
|
newargv = NULL; /* This will get reallocated on retry. */
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user