From 252cfa0a39d8483b3f990af02551b63d306a289a Mon Sep 17 00:00:00 2001 From: antirez Date: Fri, 2 Oct 2015 15:27:57 +0200 Subject: [PATCH] Lazyfree: cond vars to enabled/disable it based on DEL context. --- src/db.c | 23 ++++++++++++----------- src/replication.c | 5 ++++- src/server.c | 30 +++++++++++++++++++++--------- src/server.h | 28 ++++++++++++++-------------- 4 files changed, 51 insertions(+), 35 deletions(-) diff --git a/src/db.c b/src/db.c index 4cb7dfc0..9a4222fe 100644 --- a/src/db.c +++ b/src/db.c @@ -196,9 +196,8 @@ int dbSyncDelete(redisDb *db, robj *key) { /* This is a wrapper whose behavior depends on the Redis lazy free * configuration. Deletes the key synchronously or asynchronously. */ int dbDelete(redisDb *db, robj *key) { - int async = 1; /* TODO: Fixme making this a proper option. */ - if (async) return dbAsyncDelete(db,key); - else return dbSyncDelete(db,key); + return server.lazyfree_lazy_server_del ? dbAsyncDelete(db,key) : + dbSyncDelete(db,key); } /* Prepare the string object stored at 'key' to be modified destructively @@ -915,10 +914,10 @@ long long getExpire(redisDb *db, robj *key) { * AOF and the master->slave link guarantee operation ordering, everything * will be consistent even if we allow write operations against expiring * keys. */ -void propagateExpire(redisDb *db, robj *key) { +void propagateExpire(redisDb *db, robj *key, int lazy) { robj *argv[2]; - argv[0] = shared.del; + argv[0] = lazy ? shared.unlink : shared.del; argv[1] = key; incrRefCount(argv[0]); incrRefCount(argv[1]); @@ -961,10 +960,11 @@ int expireIfNeeded(redisDb *db, robj *key) { /* Delete the key */ server.stat_expiredkeys++; - propagateExpire(db,key); + propagateExpire(db,key,server.lazyfree_lazy_expire); notifyKeyspaceEvent(NOTIFY_EXPIRED, "expired",key,db->id); - return dbDelete(db,key); + return server.lazyfree_lazy_expire ? dbAsyncDelete(db,key) : + dbSyncDelete(db,key); } /*----------------------------------------------------------------------------- @@ -1003,13 +1003,14 @@ void expireGenericCommand(client *c, long long basetime, int unit) { if (when <= mstime() && !server.loading && !server.masterhost) { robj *aux; - serverAssertWithInfo(c,key,dbDelete(c->db,key)); + int deleted = server.lazyfree_lazy_expire ? dbAsyncDelete(c->db,key) : + dbSyncDelete(c->db,key); + serverAssertWithInfo(c,key,deleted); server.dirty++; - /* Replicate/AOF this as an explicit DEL. */ - aux = createStringObject("DEL",3); + /* Replicate/AOF this as an explicit DEL or UNLINK. */ + aux = server.lazyfree_lazy_expire ? shared.unlink : shared.del; rewriteClientCommandVector(c,2,aux,key); - decrRefCount(aux); signalModifiedKey(c->db,key); notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,c->db->id); addReply(c, shared.cone); diff --git a/src/replication.c b/src/replication.c index 5a61f4d9..c410ca50 100644 --- a/src/replication.c +++ b/src/replication.c @@ -1111,7 +1111,10 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) { } serverLog(LL_NOTICE, "MASTER <-> SLAVE sync: Flushing old data"); signalFlushedDb(-1); - emptyDb(-1,EMPTYDB_NO_FLAGS,replicationEmptyDbCallback); + emptyDb( + -1, + server.repl_slave_lazy_flush ? EMPTYDB_ASYNC : EMPTYDB_NO_FLAGS, + replicationEmptyDbCallback); /* Before loading the DB into memory we need to delete the readable * handler, otherwise it will get called recursively since * rdbLoad() will call the event loop to process events from time to diff --git a/src/server.c b/src/server.c index 0e6ed3b0..267e585e 100644 --- a/src/server.c +++ b/src/server.c @@ -740,8 +740,11 @@ int activeExpireCycleTryExpire(redisDb *db, dictEntry *de, long long now) { sds key = dictGetKey(de); robj *keyobj = createStringObject(key,sdslen(key)); - propagateExpire(db,keyobj); - dbDelete(db,keyobj); + propagateExpire(db,keyobj,server.lazyfree_lazy_expire); + if (server.lazyfree_lazy_expire) + dbAsyncDelete(db,keyobj); + else + dbSyncDelete(db,keyobj); notifyKeyspaceEvent(NOTIFY_EXPIRED, "expired",keyobj,db->id); decrRefCount(keyobj); @@ -1405,6 +1408,7 @@ void createSharedObjects(void) { shared.psubscribebulk = createStringObject("$10\r\npsubscribe\r\n",17); shared.punsubscribebulk = createStringObject("$12\r\npunsubscribe\r\n",19); shared.del = createStringObject("DEL",3); + shared.unlink = createStringObject("UNLINK",6); shared.rpop = createStringObject("RPOP",4); shared.lpop = createStringObject("LPOP",4); shared.lpush = createStringObject("LPUSH",5); @@ -1497,10 +1501,6 @@ void initServerConfig(void) { server.zset_max_ziplist_value = OBJ_ZSET_MAX_ZIPLIST_VALUE; server.hll_sparse_max_bytes = CONFIG_DEFAULT_HLL_SPARSE_MAX_BYTES; server.shutdown_asap = 0; - server.repl_ping_slave_period = CONFIG_DEFAULT_REPL_PING_SLAVE_PERIOD; - server.repl_timeout = CONFIG_DEFAULT_REPL_TIMEOUT; - server.repl_min_slaves_to_write = CONFIG_DEFAULT_MIN_SLAVES_TO_WRITE; - server.repl_min_slaves_max_lag = CONFIG_DEFAULT_MIN_SLAVES_MAX_LAG; server.cluster_enabled = 0; server.cluster_node_timeout = CLUSTER_DEFAULT_NODE_TIMEOUT; server.cluster_migration_barrier = CLUSTER_DEFAULT_MIGRATION_BARRIER; @@ -1514,6 +1514,9 @@ void initServerConfig(void) { server.migrate_cached_sockets = dictCreate(&migrateCacheDictType,NULL); server.next_client_id = 1; /* Client IDs, start from 1 .*/ server.loading_process_events_interval_bytes = (1024*1024*2); + server.lazyfree_lazy_eviction = CONFIG_DEFAULT_LAZYFREE_LAZY_EVICTION; + server.lazyfree_lazy_expire = CONFIG_DEFAULT_LAZYFREE_LAZY_EXPIRE; + server.lazyfree_lazy_server_del = CONFIG_DEFAULT_LAZYFREE_LAZY_SERVER_DEL; server.lruclock = getLRUClock(); resetServerSaveParams(); @@ -1521,6 +1524,7 @@ void initServerConfig(void) { appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */ appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */ appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */ + /* Replication related */ server.masterauth = NULL; server.masterhost = NULL; @@ -1532,10 +1536,15 @@ void initServerConfig(void) { server.repl_syncio_timeout = CONFIG_REPL_SYNCIO_TIMEOUT; server.repl_serve_stale_data = CONFIG_DEFAULT_SLAVE_SERVE_STALE_DATA; server.repl_slave_ro = CONFIG_DEFAULT_SLAVE_READ_ONLY; + server.repl_slave_lazy_flush = CONFIG_DEFAULT_SLAVE_LAZY_FLUSH; server.repl_down_since = 0; /* Never connected, repl is down since EVER. */ server.repl_disable_tcp_nodelay = CONFIG_DEFAULT_REPL_DISABLE_TCP_NODELAY; server.repl_diskless_sync = CONFIG_DEFAULT_REPL_DISKLESS_SYNC; server.repl_diskless_sync_delay = CONFIG_DEFAULT_REPL_DISKLESS_SYNC_DELAY; + server.repl_ping_slave_period = CONFIG_DEFAULT_REPL_PING_SLAVE_PERIOD; + server.repl_timeout = CONFIG_DEFAULT_REPL_TIMEOUT; + server.repl_min_slaves_to_write = CONFIG_DEFAULT_MIN_SLAVES_TO_WRITE; + server.repl_min_slaves_max_lag = CONFIG_DEFAULT_MIN_SLAVES_MAX_LAG; server.slave_priority = CONFIG_DEFAULT_SLAVE_PRIORITY; server.master_repl_offset = 0; @@ -3408,8 +3417,8 @@ int freeMemoryIfNeeded(void) { /* Finally remove the selected key. */ if (bestkey) { robj *keyobj = createStringObject(bestkey,sdslen(bestkey)); - propagateExpire(db,keyobj); - /* We compute the amount of memory freed by dbDelete() alone. + propagateExpire(db,keyobj,server.lazyfree_lazy_eviction); + /* We compute the amount of memory freed by db*Delete() alone. * It is possible that actually the memory needed to propagate * the DEL in AOF and replication link is greater than the one * we are freeing removing the key, but we can't account for @@ -3419,7 +3428,10 @@ int freeMemoryIfNeeded(void) { * we only care about memory used by the key space. */ delta = (long long) zmalloc_used_memory(); latencyStartMonitor(eviction_latency); - dbDelete(db,keyobj); + if (server.lazyfree_lazy_eviction) + dbAsyncDelete(db,keyobj); + else + dbSyncDelete(db,keyobj); latencyEndMonitor(eviction_latency); latencyAddSampleIfNeeded("eviction-del",eviction_latency); latencyRemoveNestedEvent(latency,eviction_latency); diff --git a/src/server.h b/src/server.h index d9633e46..9f3e9b0d 100644 --- a/src/server.h +++ b/src/server.h @@ -136,6 +136,10 @@ typedef long long mstime_t; /* millisecond time type. */ #define CONFIG_BINDADDR_MAX 16 #define CONFIG_MIN_RESERVED_FDS 32 #define CONFIG_DEFAULT_LATENCY_MONITOR_THRESHOLD 0 +#define CONFIG_DEFAULT_SLAVE_LAZY_FLUSH 0 +#define CONFIG_DEFAULT_LAZYFREE_LAZY_EVICTION 0 +#define CONFIG_DEFAULT_LAZYFREE_LAZY_EXPIRE 0 +#define CONFIG_DEFAULT_LAZYFREE_LAZY_SERVER_DEL 0 #define ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP 20 /* Loopkups per loop. */ #define ACTIVE_EXPIRE_CYCLE_FAST_DURATION 1000 /* Microseconds */ @@ -611,8 +615,8 @@ struct sharedObjectsStruct { *outofrangeerr, *noscripterr, *loadingerr, *slowscripterr, *bgsaveerr, *masterdownerr, *roslaveerr, *execaborterr, *noautherr, *noreplicaserr, *busykeyerr, *oomerr, *plus, *messagebulk, *pmessagebulk, *subscribebulk, - *unsubscribebulk, *psubscribebulk, *punsubscribebulk, *del, *rpop, *lpop, - *lpush, *emptyscan, + *unsubscribebulk, *psubscribebulk, *punsubscribebulk, *del, *unlink, + *rpop, *lpop, *lpush, *emptyscan, *select[PROTO_SHARED_SELECT_CMDS], *integers[OBJ_SHARED_INTEGERS], *mbulkhdr[OBJ_SHARED_BULKHDR_LEN], /* "*\r\n" */ @@ -878,6 +882,7 @@ struct redisServer { int slave_priority; /* Reported in INFO and used by Sentinel. */ char repl_master_runid[CONFIG_RUN_ID_SIZE+1]; /* Master run id for PSYNC. */ long long repl_master_initial_offset; /* Master PSYNC offset. */ + int repl_slave_lazy_flush; /* Lazy FLUSHALL before loading DB? */ /* Replication script cache. */ dict *repl_scriptcache_dict; /* SHA1 all slaves are aware of. */ list *repl_scriptcache_fifo; /* First in, first out LRU eviction. */ @@ -911,8 +916,8 @@ struct redisServer { int list_max_ziplist_size; int list_compress_depth; /* time cache */ - time_t unixtime; /* Unix time sampled every cron cycle. */ - long long mstime; /* Like 'unixtime' but with milliseconds resolution. */ + time_t unixtime; /* Unix time sampled every cron cycle. */ + long long mstime; /* Like 'unixtime' but with milliseconds resolution. */ /* Pubsub */ dict *pubsub_channels; /* Map channels to list of subscribed clients */ list *pubsub_patterns; /* A list of pubsub_patterns */ @@ -941,6 +946,10 @@ struct redisServer { int lua_timedout; /* True if we reached the time limit for script execution. */ int lua_kill; /* Kill the script if true. */ + /* Lazy free */ + int lazyfree_lazy_eviction; + int lazyfree_lazy_expire; + int lazyfree_lazy_server_del; /* Latency monitor */ long long latency_monitor_threshold; dict *latency_events; @@ -1368,7 +1377,7 @@ int rewriteConfig(char *path); /* db.c -- Keyspace access API */ int removeExpire(redisDb *db, robj *key); -void propagateExpire(redisDb *db, robj *key); +void propagateExpire(redisDb *db, robj *key, int lazy); int expireIfNeeded(redisDb *db, robj *key); long long getExpire(redisDb *db, robj *key); void setExpire(redisDb *db, robj *key, long long when); @@ -1402,15 +1411,6 @@ int parseScanCursorOrReply(client *c, robj *o, unsigned long *cursor); void slotToKeyAdd(robj *key); void slotToKeyDel(robj *key); void slotToKeyFlush(void); - -/* Lazy free. Note that SLOW and FAST are only useful when incremental - * lazy free is active. For threaded lazy free the actual freeing of objects - * happens in the background. Only STEP_OOM is used since it blocks waiting - * for the freeing thread to do some work before returning. */ -#define LAZYFREE_STEP_SLOW 0 /* Take 1-2 milliseconds to reclaim memory. */ -#define LAZYFREE_STEP_FAST 1 /* Free a few elements ASAP and return. */ -#define LAZYFREE_STEP_OOM 2 /* Free a few elements at any cost if there - is something to free: we are out of memory */ int dbAsyncDelete(redisDb *db, robj *key); void emptyDbAsync(redisDb *db); void slotToKeyFlushAsync(void);