mirror of
https://github.com/fluencelabs/redis
synced 2025-03-17 16:10:50 +00:00
Lazyfree: cond vars to enabled/disable it based on DEL context.
This commit is contained in:
parent
5359696796
commit
252cfa0a39
23
src/db.c
23
src/db.c
@ -196,9 +196,8 @@ int dbSyncDelete(redisDb *db, robj *key) {
|
||||
/* This is a wrapper whose behavior depends on the Redis lazy free
|
||||
* configuration. Deletes the key synchronously or asynchronously. */
|
||||
int dbDelete(redisDb *db, robj *key) {
|
||||
int async = 1; /* TODO: Fixme making this a proper option. */
|
||||
if (async) return dbAsyncDelete(db,key);
|
||||
else return dbSyncDelete(db,key);
|
||||
return server.lazyfree_lazy_server_del ? dbAsyncDelete(db,key) :
|
||||
dbSyncDelete(db,key);
|
||||
}
|
||||
|
||||
/* Prepare the string object stored at 'key' to be modified destructively
|
||||
@ -915,10 +914,10 @@ long long getExpire(redisDb *db, robj *key) {
|
||||
* AOF and the master->slave link guarantee operation ordering, everything
|
||||
* will be consistent even if we allow write operations against expiring
|
||||
* keys. */
|
||||
void propagateExpire(redisDb *db, robj *key) {
|
||||
void propagateExpire(redisDb *db, robj *key, int lazy) {
|
||||
robj *argv[2];
|
||||
|
||||
argv[0] = shared.del;
|
||||
argv[0] = lazy ? shared.unlink : shared.del;
|
||||
argv[1] = key;
|
||||
incrRefCount(argv[0]);
|
||||
incrRefCount(argv[1]);
|
||||
@ -961,10 +960,11 @@ int expireIfNeeded(redisDb *db, robj *key) {
|
||||
|
||||
/* Delete the key */
|
||||
server.stat_expiredkeys++;
|
||||
propagateExpire(db,key);
|
||||
propagateExpire(db,key,server.lazyfree_lazy_expire);
|
||||
notifyKeyspaceEvent(NOTIFY_EXPIRED,
|
||||
"expired",key,db->id);
|
||||
return dbDelete(db,key);
|
||||
return server.lazyfree_lazy_expire ? dbAsyncDelete(db,key) :
|
||||
dbSyncDelete(db,key);
|
||||
}
|
||||
|
||||
/*-----------------------------------------------------------------------------
|
||||
@ -1003,13 +1003,14 @@ void expireGenericCommand(client *c, long long basetime, int unit) {
|
||||
if (when <= mstime() && !server.loading && !server.masterhost) {
|
||||
robj *aux;
|
||||
|
||||
serverAssertWithInfo(c,key,dbDelete(c->db,key));
|
||||
int deleted = server.lazyfree_lazy_expire ? dbAsyncDelete(c->db,key) :
|
||||
dbSyncDelete(c->db,key);
|
||||
serverAssertWithInfo(c,key,deleted);
|
||||
server.dirty++;
|
||||
|
||||
/* Replicate/AOF this as an explicit DEL. */
|
||||
aux = createStringObject("DEL",3);
|
||||
/* Replicate/AOF this as an explicit DEL or UNLINK. */
|
||||
aux = server.lazyfree_lazy_expire ? shared.unlink : shared.del;
|
||||
rewriteClientCommandVector(c,2,aux,key);
|
||||
decrRefCount(aux);
|
||||
signalModifiedKey(c->db,key);
|
||||
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,c->db->id);
|
||||
addReply(c, shared.cone);
|
||||
|
@ -1111,7 +1111,10 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
|
||||
}
|
||||
serverLog(LL_NOTICE, "MASTER <-> SLAVE sync: Flushing old data");
|
||||
signalFlushedDb(-1);
|
||||
emptyDb(-1,EMPTYDB_NO_FLAGS,replicationEmptyDbCallback);
|
||||
emptyDb(
|
||||
-1,
|
||||
server.repl_slave_lazy_flush ? EMPTYDB_ASYNC : EMPTYDB_NO_FLAGS,
|
||||
replicationEmptyDbCallback);
|
||||
/* Before loading the DB into memory we need to delete the readable
|
||||
* handler, otherwise it will get called recursively since
|
||||
* rdbLoad() will call the event loop to process events from time to
|
||||
|
30
src/server.c
30
src/server.c
@ -740,8 +740,11 @@ int activeExpireCycleTryExpire(redisDb *db, dictEntry *de, long long now) {
|
||||
sds key = dictGetKey(de);
|
||||
robj *keyobj = createStringObject(key,sdslen(key));
|
||||
|
||||
propagateExpire(db,keyobj);
|
||||
dbDelete(db,keyobj);
|
||||
propagateExpire(db,keyobj,server.lazyfree_lazy_expire);
|
||||
if (server.lazyfree_lazy_expire)
|
||||
dbAsyncDelete(db,keyobj);
|
||||
else
|
||||
dbSyncDelete(db,keyobj);
|
||||
notifyKeyspaceEvent(NOTIFY_EXPIRED,
|
||||
"expired",keyobj,db->id);
|
||||
decrRefCount(keyobj);
|
||||
@ -1405,6 +1408,7 @@ void createSharedObjects(void) {
|
||||
shared.psubscribebulk = createStringObject("$10\r\npsubscribe\r\n",17);
|
||||
shared.punsubscribebulk = createStringObject("$12\r\npunsubscribe\r\n",19);
|
||||
shared.del = createStringObject("DEL",3);
|
||||
shared.unlink = createStringObject("UNLINK",6);
|
||||
shared.rpop = createStringObject("RPOP",4);
|
||||
shared.lpop = createStringObject("LPOP",4);
|
||||
shared.lpush = createStringObject("LPUSH",5);
|
||||
@ -1497,10 +1501,6 @@ void initServerConfig(void) {
|
||||
server.zset_max_ziplist_value = OBJ_ZSET_MAX_ZIPLIST_VALUE;
|
||||
server.hll_sparse_max_bytes = CONFIG_DEFAULT_HLL_SPARSE_MAX_BYTES;
|
||||
server.shutdown_asap = 0;
|
||||
server.repl_ping_slave_period = CONFIG_DEFAULT_REPL_PING_SLAVE_PERIOD;
|
||||
server.repl_timeout = CONFIG_DEFAULT_REPL_TIMEOUT;
|
||||
server.repl_min_slaves_to_write = CONFIG_DEFAULT_MIN_SLAVES_TO_WRITE;
|
||||
server.repl_min_slaves_max_lag = CONFIG_DEFAULT_MIN_SLAVES_MAX_LAG;
|
||||
server.cluster_enabled = 0;
|
||||
server.cluster_node_timeout = CLUSTER_DEFAULT_NODE_TIMEOUT;
|
||||
server.cluster_migration_barrier = CLUSTER_DEFAULT_MIGRATION_BARRIER;
|
||||
@ -1514,6 +1514,9 @@ void initServerConfig(void) {
|
||||
server.migrate_cached_sockets = dictCreate(&migrateCacheDictType,NULL);
|
||||
server.next_client_id = 1; /* Client IDs, start from 1 .*/
|
||||
server.loading_process_events_interval_bytes = (1024*1024*2);
|
||||
server.lazyfree_lazy_eviction = CONFIG_DEFAULT_LAZYFREE_LAZY_EVICTION;
|
||||
server.lazyfree_lazy_expire = CONFIG_DEFAULT_LAZYFREE_LAZY_EXPIRE;
|
||||
server.lazyfree_lazy_server_del = CONFIG_DEFAULT_LAZYFREE_LAZY_SERVER_DEL;
|
||||
|
||||
server.lruclock = getLRUClock();
|
||||
resetServerSaveParams();
|
||||
@ -1521,6 +1524,7 @@ void initServerConfig(void) {
|
||||
appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
|
||||
appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
|
||||
appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
|
||||
|
||||
/* Replication related */
|
||||
server.masterauth = NULL;
|
||||
server.masterhost = NULL;
|
||||
@ -1532,10 +1536,15 @@ void initServerConfig(void) {
|
||||
server.repl_syncio_timeout = CONFIG_REPL_SYNCIO_TIMEOUT;
|
||||
server.repl_serve_stale_data = CONFIG_DEFAULT_SLAVE_SERVE_STALE_DATA;
|
||||
server.repl_slave_ro = CONFIG_DEFAULT_SLAVE_READ_ONLY;
|
||||
server.repl_slave_lazy_flush = CONFIG_DEFAULT_SLAVE_LAZY_FLUSH;
|
||||
server.repl_down_since = 0; /* Never connected, repl is down since EVER. */
|
||||
server.repl_disable_tcp_nodelay = CONFIG_DEFAULT_REPL_DISABLE_TCP_NODELAY;
|
||||
server.repl_diskless_sync = CONFIG_DEFAULT_REPL_DISKLESS_SYNC;
|
||||
server.repl_diskless_sync_delay = CONFIG_DEFAULT_REPL_DISKLESS_SYNC_DELAY;
|
||||
server.repl_ping_slave_period = CONFIG_DEFAULT_REPL_PING_SLAVE_PERIOD;
|
||||
server.repl_timeout = CONFIG_DEFAULT_REPL_TIMEOUT;
|
||||
server.repl_min_slaves_to_write = CONFIG_DEFAULT_MIN_SLAVES_TO_WRITE;
|
||||
server.repl_min_slaves_max_lag = CONFIG_DEFAULT_MIN_SLAVES_MAX_LAG;
|
||||
server.slave_priority = CONFIG_DEFAULT_SLAVE_PRIORITY;
|
||||
server.master_repl_offset = 0;
|
||||
|
||||
@ -3408,8 +3417,8 @@ int freeMemoryIfNeeded(void) {
|
||||
/* Finally remove the selected key. */
|
||||
if (bestkey) {
|
||||
robj *keyobj = createStringObject(bestkey,sdslen(bestkey));
|
||||
propagateExpire(db,keyobj);
|
||||
/* We compute the amount of memory freed by dbDelete() alone.
|
||||
propagateExpire(db,keyobj,server.lazyfree_lazy_eviction);
|
||||
/* We compute the amount of memory freed by db*Delete() alone.
|
||||
* It is possible that actually the memory needed to propagate
|
||||
* the DEL in AOF and replication link is greater than the one
|
||||
* we are freeing removing the key, but we can't account for
|
||||
@ -3419,7 +3428,10 @@ int freeMemoryIfNeeded(void) {
|
||||
* we only care about memory used by the key space. */
|
||||
delta = (long long) zmalloc_used_memory();
|
||||
latencyStartMonitor(eviction_latency);
|
||||
dbDelete(db,keyobj);
|
||||
if (server.lazyfree_lazy_eviction)
|
||||
dbAsyncDelete(db,keyobj);
|
||||
else
|
||||
dbSyncDelete(db,keyobj);
|
||||
latencyEndMonitor(eviction_latency);
|
||||
latencyAddSampleIfNeeded("eviction-del",eviction_latency);
|
||||
latencyRemoveNestedEvent(latency,eviction_latency);
|
||||
|
28
src/server.h
28
src/server.h
@ -136,6 +136,10 @@ typedef long long mstime_t; /* millisecond time type. */
|
||||
#define CONFIG_BINDADDR_MAX 16
|
||||
#define CONFIG_MIN_RESERVED_FDS 32
|
||||
#define CONFIG_DEFAULT_LATENCY_MONITOR_THRESHOLD 0
|
||||
#define CONFIG_DEFAULT_SLAVE_LAZY_FLUSH 0
|
||||
#define CONFIG_DEFAULT_LAZYFREE_LAZY_EVICTION 0
|
||||
#define CONFIG_DEFAULT_LAZYFREE_LAZY_EXPIRE 0
|
||||
#define CONFIG_DEFAULT_LAZYFREE_LAZY_SERVER_DEL 0
|
||||
|
||||
#define ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP 20 /* Loopkups per loop. */
|
||||
#define ACTIVE_EXPIRE_CYCLE_FAST_DURATION 1000 /* Microseconds */
|
||||
@ -611,8 +615,8 @@ struct sharedObjectsStruct {
|
||||
*outofrangeerr, *noscripterr, *loadingerr, *slowscripterr, *bgsaveerr,
|
||||
*masterdownerr, *roslaveerr, *execaborterr, *noautherr, *noreplicaserr,
|
||||
*busykeyerr, *oomerr, *plus, *messagebulk, *pmessagebulk, *subscribebulk,
|
||||
*unsubscribebulk, *psubscribebulk, *punsubscribebulk, *del, *rpop, *lpop,
|
||||
*lpush, *emptyscan,
|
||||
*unsubscribebulk, *psubscribebulk, *punsubscribebulk, *del, *unlink,
|
||||
*rpop, *lpop, *lpush, *emptyscan,
|
||||
*select[PROTO_SHARED_SELECT_CMDS],
|
||||
*integers[OBJ_SHARED_INTEGERS],
|
||||
*mbulkhdr[OBJ_SHARED_BULKHDR_LEN], /* "*<value>\r\n" */
|
||||
@ -878,6 +882,7 @@ struct redisServer {
|
||||
int slave_priority; /* Reported in INFO and used by Sentinel. */
|
||||
char repl_master_runid[CONFIG_RUN_ID_SIZE+1]; /* Master run id for PSYNC. */
|
||||
long long repl_master_initial_offset; /* Master PSYNC offset. */
|
||||
int repl_slave_lazy_flush; /* Lazy FLUSHALL before loading DB? */
|
||||
/* Replication script cache. */
|
||||
dict *repl_scriptcache_dict; /* SHA1 all slaves are aware of. */
|
||||
list *repl_scriptcache_fifo; /* First in, first out LRU eviction. */
|
||||
@ -911,8 +916,8 @@ struct redisServer {
|
||||
int list_max_ziplist_size;
|
||||
int list_compress_depth;
|
||||
/* time cache */
|
||||
time_t unixtime; /* Unix time sampled every cron cycle. */
|
||||
long long mstime; /* Like 'unixtime' but with milliseconds resolution. */
|
||||
time_t unixtime; /* Unix time sampled every cron cycle. */
|
||||
long long mstime; /* Like 'unixtime' but with milliseconds resolution. */
|
||||
/* Pubsub */
|
||||
dict *pubsub_channels; /* Map channels to list of subscribed clients */
|
||||
list *pubsub_patterns; /* A list of pubsub_patterns */
|
||||
@ -941,6 +946,10 @@ struct redisServer {
|
||||
int lua_timedout; /* True if we reached the time limit for script
|
||||
execution. */
|
||||
int lua_kill; /* Kill the script if true. */
|
||||
/* Lazy free */
|
||||
int lazyfree_lazy_eviction;
|
||||
int lazyfree_lazy_expire;
|
||||
int lazyfree_lazy_server_del;
|
||||
/* Latency monitor */
|
||||
long long latency_monitor_threshold;
|
||||
dict *latency_events;
|
||||
@ -1368,7 +1377,7 @@ int rewriteConfig(char *path);
|
||||
|
||||
/* db.c -- Keyspace access API */
|
||||
int removeExpire(redisDb *db, robj *key);
|
||||
void propagateExpire(redisDb *db, robj *key);
|
||||
void propagateExpire(redisDb *db, robj *key, int lazy);
|
||||
int expireIfNeeded(redisDb *db, robj *key);
|
||||
long long getExpire(redisDb *db, robj *key);
|
||||
void setExpire(redisDb *db, robj *key, long long when);
|
||||
@ -1402,15 +1411,6 @@ int parseScanCursorOrReply(client *c, robj *o, unsigned long *cursor);
|
||||
void slotToKeyAdd(robj *key);
|
||||
void slotToKeyDel(robj *key);
|
||||
void slotToKeyFlush(void);
|
||||
|
||||
/* Lazy free. Note that SLOW and FAST are only useful when incremental
|
||||
* lazy free is active. For threaded lazy free the actual freeing of objects
|
||||
* happens in the background. Only STEP_OOM is used since it blocks waiting
|
||||
* for the freeing thread to do some work before returning. */
|
||||
#define LAZYFREE_STEP_SLOW 0 /* Take 1-2 milliseconds to reclaim memory. */
|
||||
#define LAZYFREE_STEP_FAST 1 /* Free a few elements ASAP and return. */
|
||||
#define LAZYFREE_STEP_OOM 2 /* Free a few elements at any cost if there
|
||||
is something to free: we are out of memory */
|
||||
int dbAsyncDelete(redisDb *db, robj *key);
|
||||
void emptyDbAsync(redisDb *db);
|
||||
void slotToKeyFlushAsync(void);
|
||||
|
Loading…
x
Reference in New Issue
Block a user