diff --git a/src/cluster.c b/src/cluster.c index bedf5f81..0f4fd20e 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -4617,7 +4617,7 @@ void restoreCommand(client *c) { /* Create the key and set the TTL if any */ dbAdd(c->db,c->argv[1],obj); - if (ttl) setExpire(c->db,c->argv[1],mstime()+ttl); + if (ttl) setExpire(c,c->db,c->argv[1],mstime()+ttl); signalModifiedKey(c->db,c->argv[1]); addReply(c,shared.ok); server.dirty++; diff --git a/src/db.c b/src/db.c index 55ae663c..90a75fcf 100644 --- a/src/db.c +++ b/src/db.c @@ -190,7 +190,9 @@ void dbOverwrite(redisDb *db, robj *key, robj *val) { * * 1) The ref count of the value object is incremented. * 2) clients WATCHing for the destination key notified. - * 3) The expire time of the key is reset (the key is made persistent). */ + * 3) The expire time of the key is reset (the key is made persistent). + * + * All the new keys in the database should be craeted via this interface. */ void setKey(redisDb *db, robj *key, robj *val) { if (lookupKeyWrite(db,key) == NULL) { dbAdd(db,key,val); @@ -330,6 +332,7 @@ long long emptyDb(int dbnum, int flags, void(callback)(void*)) { slotToKeyFlush(); } } + if (dbnum == -1) flushSlaveKeysWithExpireList(); return removed; } @@ -851,7 +854,7 @@ void renameGenericCommand(client *c, int nx) { dbDelete(c->db,c->argv[2]); } dbAdd(c->db,c->argv[2],o); - if (expire != -1) setExpire(c->db,c->argv[2],expire); + if (expire != -1) setExpire(c,c->db,c->argv[2],expire); dbDelete(c->db,c->argv[1]); signalModifiedKey(c->db,c->argv[1]); signalModifiedKey(c->db,c->argv[2]); @@ -917,7 +920,7 @@ void moveCommand(client *c) { return; } dbAdd(dst,c->argv[1],o); - if (expire != -1) setExpire(dst,c->argv[1],expire); + if (expire != -1) setExpire(c,dst,c->argv[1],expire); incrRefCount(o); /* OK! key moved, free the entry in the source DB */ @@ -1022,7 +1025,11 @@ int removeExpire(redisDb *db, robj *key) { return dictDelete(db->expires,key->ptr) == DICT_OK; } -void setExpire(redisDb *db, robj *key, long long when) { +/* Set an expire to the specified key. If the expire is set in the context + * of an user calling a command 'c' is the client, otherwise 'c' is set + * to NULL. The 'when' parameter is the absolute unix time in milliseconds + * after which the key will no longer be considered valid. */ +void setExpire(client *c, redisDb *db, robj *key, long long when) { dictEntry *kde, *de; /* Reuse the sds from the main dict in the expire dict */ @@ -1030,6 +1037,10 @@ void setExpire(redisDb *db, robj *key, long long when) { serverAssertWithInfo(NULL,key,kde != NULL); de = dictAddOrFind(db->expires,dictGetKey(kde)); dictSetSignedIntegerVal(de,when); + + int writable_slave = server.masterhost && server.repl_slave_ro == 0; + if (c && writable_slave && !(c->flags & CLIENT_MASTER)) + rememberSlaveKeyWithExpire(db,key); } /* Return the expire time of the specified key, or -1 if no expire diff --git a/src/expire.c b/src/expire.c index ccfa959e..4dd51cfb 100644 --- a/src/expire.c +++ b/src/expire.c @@ -216,6 +216,139 @@ void activeExpireCycle(int type) { } } +/*----------------------------------------------------------------------------- + * Expires of keys crated in writable slaves + * + * Normally slaves do not process expires: they wait the masters to synthesize + * DEL operations in order to retain consistency. However writable slaves are + * an exception: if a key is created in the slave and an expire is assigned + * to it, we need a way to expire such a key, since the master does not know + * anything about such a key. + * + * In order to do so, we track keys created in the slave side with an expire + * set, and call the expireSlaveKeys() function from time to time in order to + * reclaim the keys if they already expired. + * + * Note that the use case we are trying to cover here, is a popular one where + * slaves are put in writable mode in order to compute slow operations in + * the slave side that are mostly useful to actually read data in a more + * processed way. Think at sets intersections in a tmp key, with an expire so + * that it is also used as a cache to avoid intersecting every time. + * + * This implementation is currently not perfect but a lot better than leaking + * the keys as implemented in 3.2. + *----------------------------------------------------------------------------*/ + +/* The dictionary where we remember key names and database ID of keys we may + * want to expire from the slave. Since this function is not often used we + * don't even care to initialize the database at startup. We'll do it once + * the feature is used the first time, that is, when rememberSlaveKeyWithExpire() + * is called. + * + * The dictionary has an SDS string representing the key as the hash table + * key, while the value is a 64 bit unsigned integer with the bits corresponding + * to the DB where the keys may exist set to 1. Currently the keys created + * with a DB id > 63 are not expired, but a trivial fix is to set the bitmap + * to the max 64 bit unsigned value when we know there is a key with a DB + * ID greater than 63, and check all the configured DBs in such a case. */ +dict *slaveKeysWithExpire = NULL; + +/* Check the set of keys created by the master with an expire set in order to + * check if they should be evicted. */ +void expireSlaveKeys(void) { + if (slaveKeysWithExpire == NULL || + dictSize(slaveKeysWithExpire) == 0) return; + + int cycles = 0, noexpire = 0; + mstime_t start = mstime(); + while(1) { + dictEntry *de = dictGetRandomKey(slaveKeysWithExpire); + sds keyname = dictGetKey(de); + uint64_t dbids = dictGetUnsignedIntegerVal(de); + uint64_t new_dbids = 0; + + /* Check the key against every database corresponding to the + * bits set in the value bitmap. */ + int dbid = 0; + while(dbids && dbid < server.dbnum) { + if ((dbids & 1) != 0) { + redisDb *db = server.db+dbid; + dictEntry *expire = dictFind(db->expires,keyname); + int expired = 0; + + if (expire && + activeExpireCycleTryExpire(server.db+dbid,expire,start)) + { + expired = 1; + } + + /* If the key was not expired in this DB, we need to set the + * corresponding bit in the new bitmap we set as value. + * At the end of the loop if the bitmap is zero, it means we + * no longer need to keep track of this key. */ + if (expire && !expired) { + noexpire++; + new_dbids |= (uint64_t)1 << dbid; + } + } + dbid++; + dbids >>= 1; + } + + /* Set the new bitmap as value of the key, in the dictionary + * of keys with an expire set directly in the writable slave. Otherwise + * if the bitmap is zero, we no longer need to keep track of it. */ + if (new_dbids) + dictSetUnsignedIntegerVal(de,new_dbids); + else + dictDelete(slaveKeysWithExpire,keyname); + + /* Stop conditions: found 3 keys we cna't expire in a row or + * time limit was reached. */ + cycles++; + if (noexpire > 3) break; + if ((cycles % 64) == 0 && mstime()-start > 1) break; + if (dictSize(slaveKeysWithExpire) == 0) break; + } +} + +/* Track keys that received an EXPIRE or similar command in the context + * of a writable slave. */ +void rememberSlaveKeyWithExpire(redisDb *db, robj *key) { + if (slaveKeysWithExpire == NULL) + slaveKeysWithExpire = dictCreate(&keyptrDictType,NULL); + if (db->id > 63) return; + + dictEntry *de = dictAddOrFind(slaveKeysWithExpire,key->ptr); + /* If the entry was just created, set it to a copy of the SDS string + * representing the key: we don't want to need to take those keys + * in sync with the main DB. The keys will be removed by expireSlaveKeys() + * as it scans to find keys to remove. */ + if (de->key == key->ptr) { + de->key = sdsdup(key->ptr); + dictSetUnsignedIntegerVal(de,0); + } + + uint64_t dbids = dictGetUnsignedIntegerVal(de); + dbids |= (uint64_t)1 << db->id; + dictSetUnsignedIntegerVal(de,dbids); +} + +/* Remove the keys in the hash table. We need to do that when data is + * flushed from the server. We may receive new keys from the master with + * the same name/db and it is no longer a good idea to expire them. + * + * Note: technically we should handle the case of a single DB being flushed + * but it is not worth it since anyway race conditions using the same set + * of key names in a wriatable slave and in its master will lead to + * inconsistencies. This is just a best-effort thing we do. */ +void flushSlaveKeysWithExpireList(void) { + if (slaveKeysWithExpire) { + dictRelease(slaveKeysWithExpire); + slaveKeysWithExpire = NULL; + } +} + /*----------------------------------------------------------------------------- * Expires Commands *----------------------------------------------------------------------------*/ @@ -265,7 +398,7 @@ void expireGenericCommand(client *c, long long basetime, int unit) { addReply(c, shared.cone); return; } else { - setExpire(c->db,key,when); + setExpire(c,c->db,key,when); addReply(c,shared.cone); signalModifiedKey(c->db,key); notifyKeyspaceEvent(NOTIFY_GENERIC,"expire",key,c->db->id); diff --git a/src/module.c b/src/module.c index 52b15fa3..5f85bf31 100644 --- a/src/module.c +++ b/src/module.c @@ -1342,7 +1342,7 @@ int RM_SetExpire(RedisModuleKey *key, mstime_t expire) { return REDISMODULE_ERR; if (expire != REDISMODULE_NO_EXPIRE) { expire += mstime(); - setExpire(key->db,key->key,expire); + setExpire(key->ctx->client,key->db,key->key,expire); } else { removeExpire(key->db,key->key); } diff --git a/src/rdb.c b/src/rdb.c index b81d0808..2689b172 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -1552,7 +1552,7 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi) { dbAdd(db,key,val); /* Set the expire time if needed */ - if (expiretime != -1) setExpire(db,key,expiretime); + if (expiretime != -1) setExpire(NULL,db,key,expiretime); decrRefCount(key); } diff --git a/src/server.c b/src/server.c index 30951668..4d204027 100644 --- a/src/server.c +++ b/src/server.c @@ -870,8 +870,11 @@ void clientsCron(void) { void databasesCron(void) { /* Expire keys by random sampling. Not required for slaves * as master will synthesize DELs for us. */ - if (server.active_expire_enabled && server.masterhost == NULL) + if (server.active_expire_enabled && server.masterhost == NULL) { activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW); + } else if (server.masterhost != NULL) { + expireSlaveKeys(); + } /* Perform hash tables rehashing if needed, but only if there are no * other processes saving the DB on disk. Otherwise rehashing is bad diff --git a/src/server.h b/src/server.h index 1ad86200..0bd344e9 100644 --- a/src/server.h +++ b/src/server.h @@ -1648,7 +1648,7 @@ int removeExpire(redisDb *db, robj *key); void propagateExpire(redisDb *db, robj *key, int lazy); int expireIfNeeded(redisDb *db, robj *key); long long getExpire(redisDb *db, robj *key); -void setExpire(redisDb *db, robj *key, long long when); +void setExpire(client *c, redisDb *db, robj *key, long long when); robj *lookupKey(redisDb *db, robj *key, int flags); robj *lookupKeyRead(redisDb *db, robj *key); robj *lookupKeyWrite(redisDb *db, robj *key); @@ -1731,6 +1731,9 @@ void disconnectAllBlockedClients(void); /* expire.c -- Handling of expired keys */ void activeExpireCycle(int type); +void expireSlaveKeys(void); +void rememberSlaveKeyWithExpire(redisDb *db, robj *key); +void flushSlaveKeysWithExpireList(void); /* evict.c -- maxmemory handling and LRU eviction. */ void evictionPoolAlloc(void); diff --git a/src/t_string.c b/src/t_string.c index 8c737c4e..75375f44 100644 --- a/src/t_string.c +++ b/src/t_string.c @@ -85,7 +85,7 @@ void setGenericCommand(client *c, int flags, robj *key, robj *val, robj *expire, } setKey(c->db,key,val); server.dirty++; - if (expire) setExpire(c->db,key,mstime()+milliseconds); + if (expire) setExpire(c,c->db,key,mstime()+milliseconds); notifyKeyspaceEvent(NOTIFY_STRING,"set",key,c->db->id); if (expire) notifyKeyspaceEvent(NOTIFY_GENERIC, "expire",key,c->db->id);