Modules TSC: Improve inter-thread synchronization.

More work to do with server.unixtime and similar. Need to write Helgrind
suppression file in order to suppress the valse positives.
This commit is contained in:
antirez 2017-05-09 11:57:09 +02:00
parent 2a51bac44e
commit ece658713b
5 changed files with 75 additions and 20 deletions

View File

@ -3,18 +3,29 @@
* *
* The exported interaface is composed of three macros: * The exported interaface is composed of three macros:
* *
* atomicIncr(var,count,mutex) -- Increment the atomic counter * atomicIncr(var,count) -- Increment the atomic counter
* atomicDecr(var,count,mutex) -- Decrement the atomic counter * atomicGetIncr(var,oldvalue_var,count) -- Get and increment the atomic counter
* atomicGet(var,dstvar,mutex) -- Fetch the atomic counter value * atomicDecr(var,count) -- Decrement the atomic counter
* atomicGet(var,dstvar) -- Fetch the atomic counter value
* atomicSet(var,value) -- Set the atomic counter value
*
* The variable 'var' should also have a declared mutex with the same
* name and the "_mutex" postfix, for instance:
*
* long myvar;
* pthread_mutex_t myvar_mutex;
* atomicSet(myvar,12345);
* *
* If atomic primitives are availble (tested in config.h) the mutex * If atomic primitives are availble (tested in config.h) the mutex
* is not used. * is not used.
* *
* Never use return value from the macros. To update and get use instead: * Never use return value from the macros, instead use the AtomicGetIncr()
* if you need to get the current value and increment it atomically, like
* in the followign example:
* *
* atomicIncr(mycounter,...); * long oldvalue;
* atomicGet(mycounter,newvalue); * atomicGetIncr(myvar,oldvalue,1);
* doSomethingWith(newvalue); * doSomethingWith(oldvalue);
* *
* ---------------------------------------------------------------------------- * ----------------------------------------------------------------------------
* *
@ -55,19 +66,29 @@
/* Implementation using __atomic macros. */ /* Implementation using __atomic macros. */
#define atomicIncr(var,count) __atomic_add_fetch(&var,(count),__ATOMIC_RELAXED) #define atomicIncr(var,count) __atomic_add_fetch(&var,(count),__ATOMIC_RELAXED)
#define atomicGetIncr(var,oldvalue_var,count) do { \
oldvalue_var = __atomic_fetch_add(&var,(count),__ATOMIC_RELAXED); \
} while(0)
#define atomicDecr(var,count) __atomic_sub_fetch(&var,(count),__ATOMIC_RELAXED) #define atomicDecr(var,count) __atomic_sub_fetch(&var,(count),__ATOMIC_RELAXED)
#define atomicGet(var,dstvar) do { \ #define atomicGet(var,dstvar) do { \
dstvar = __atomic_load_n(&var,__ATOMIC_RELAXED); \ dstvar = __atomic_load_n(&var,__ATOMIC_RELAXED); \
} while(0) } while(0)
#define atomicSet(var,value) __atomic_store_n(&var,value,__ATOMIC_RELAXED)
#elif defined(HAVE_ATOMIC) #elif defined(HAVE_ATOMIC)
/* Implementation using __sync macros. */ /* Implementation using __sync macros. */
#define atomicIncr(var,count) __sync_add_and_fetch(&var,(count)) #define atomicIncr(var,count) __sync_add_and_fetch(&var,(count))
#define atomicGetIncr(var,oldvalue_var,count) do { \
oldvalue_var = __sync_fetch_and_add(&var,(count)); \
} while(0)
#define atomicDecr(var,count) __sync_sub_and_fetch(&var,(count)) #define atomicDecr(var,count) __sync_sub_and_fetch(&var,(count))
#define atomicGet(var,dstvar) do { \ #define atomicGet(var,dstvar) do { \
dstvar = __sync_sub_and_fetch(&var,0); \ dstvar = __sync_sub_and_fetch(&var,0); \
} while(0) } while(0)
#define atomicSet(var,value) do { \
while(!__sync_bool_compare_and_swap(&var,var,value)); \
} while(0)
#else #else
/* Implementation using pthread mutex. */ /* Implementation using pthread mutex. */
@ -78,6 +99,13 @@
pthread_mutex_unlock(&var ## _mutex); \ pthread_mutex_unlock(&var ## _mutex); \
} while(0) } while(0)
#define atomicGetIncr(var,oldvalue_var,count) do { \
pthread_mutex_lock(&var ## _mutex); \
oldvalue_var = var; \
var += (count); \
pthread_mutex_unlock(&var ## _mutex); \
} while(0)
#define atomicDecr(var,count) do { \ #define atomicDecr(var,count) do { \
pthread_mutex_lock(&var ## _mutex); \ pthread_mutex_lock(&var ## _mutex); \
var -= (count); \ var -= (count); \
@ -89,6 +117,12 @@
dstvar = var; \ dstvar = var; \
pthread_mutex_unlock(&var ## _mutex); \ pthread_mutex_unlock(&var ## _mutex); \
} while(0) } while(0)
#define atomicSet(var,value) do { \
pthread_mutex_lock(&var ## _mutex); \
var = value; \
pthread_mutex_unlock(&var ## _mutex); \
} while(0)
#endif #endif
#endif /* __ATOMIC_VAR_H */ #endif /* __ATOMIC_VAR_H */

View File

@ -32,6 +32,7 @@
#include "server.h" #include "server.h"
#include "bio.h" #include "bio.h"
#include "atomicvar.h"
/* ---------------------------------------------------------------------------- /* ----------------------------------------------------------------------------
* Data structures * Data structures
@ -72,6 +73,20 @@ unsigned int getLRUClock(void) {
return (mstime()/LRU_CLOCK_RESOLUTION) & LRU_CLOCK_MAX; return (mstime()/LRU_CLOCK_RESOLUTION) & LRU_CLOCK_MAX;
} }
/* This function is used to obtain the current LRU clock.
* If the current resolution is lower than the frequency we refresh the
* LRU clock (as it should be in production servers) we return the
* precomputed value, otherwise we need to resort to a system call. */
unsigned int LRU_CLOCK(void) {
unsigned int lruclock;
if (1000/server.hz <= LRU_CLOCK_RESOLUTION) {
atomicGet(server.lruclock,lruclock);
} else {
lruclock = getLRUClock();
}
return lruclock;
}
/* Given an object returns the min number of milliseconds the object was never /* Given an object returns the min number of milliseconds the object was never
* requested, using an approximated LRU algorithm. */ * requested, using an approximated LRU algorithm. */
unsigned long long estimateObjectIdleTime(robj *o) { unsigned long long estimateObjectIdleTime(robj *o) {

View File

@ -28,6 +28,7 @@
*/ */
#include "server.h" #include "server.h"
#include "atomicvar.h"
#include <sys/uio.h> #include <sys/uio.h>
#include <math.h> #include <math.h>
#include <ctype.h> #include <ctype.h>
@ -88,7 +89,9 @@ client *createClient(int fd) {
} }
selectDb(c,0); selectDb(c,0);
c->id = server.next_client_id++; uint64_t client_id;
atomicGetIncr(server.next_client_id,client_id,1);
c->id = client_id;
c->fd = fd; c->fd = fd;
c->name = NULL; c->name = NULL;
c->bufpos = 0; c->bufpos = 0;

View File

@ -32,6 +32,7 @@
#include "slowlog.h" #include "slowlog.h"
#include "bio.h" #include "bio.h"
#include "latency.h" #include "latency.h"
#include "atomicvar.h"
#include <time.h> #include <time.h>
#include <signal.h> #include <signal.h>
@ -68,7 +69,8 @@ double R_Zero, R_PosInf, R_NegInf, R_Nan;
/*================================= Globals ================================= */ /*================================= Globals ================================= */
/* Global vars */ /* Global vars */
struct redisServer server; /* server global state */ struct redisServer server; /* Server global state */
volatile unsigned long lru_clock; /* Server global current LRU time. */
/* Our command table. /* Our command table.
* *
@ -976,7 +978,8 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
* *
* Note that you can change the resolution altering the * Note that you can change the resolution altering the
* LRU_CLOCK_RESOLUTION define. */ * LRU_CLOCK_RESOLUTION define. */
server.lruclock = getLRUClock(); unsigned long lruclock = getLRUClock();
atomicSet(server.lruclock,lruclock);
/* Record the max memory used since the server was started. */ /* Record the max memory used since the server was started. */
if (zmalloc_used_memory() > server.stat_peak_memory) if (zmalloc_used_memory() > server.stat_peak_memory)
@ -1420,6 +1423,7 @@ void initServerConfig(void) {
server.cluster_announce_bus_port = CONFIG_DEFAULT_CLUSTER_ANNOUNCE_BUS_PORT; server.cluster_announce_bus_port = CONFIG_DEFAULT_CLUSTER_ANNOUNCE_BUS_PORT;
server.migrate_cached_sockets = dictCreate(&migrateCacheDictType,NULL); server.migrate_cached_sockets = dictCreate(&migrateCacheDictType,NULL);
server.next_client_id = 1; /* Client IDs, start from 1 .*/ server.next_client_id = 1; /* Client IDs, start from 1 .*/
pthread_mutex_init(&server.next_client_id_mutex,NULL);
server.loading_process_events_interval_bytes = (1024*1024*2); server.loading_process_events_interval_bytes = (1024*1024*2);
server.lazyfree_lazy_eviction = CONFIG_DEFAULT_LAZYFREE_LAZY_EVICTION; server.lazyfree_lazy_eviction = CONFIG_DEFAULT_LAZYFREE_LAZY_EVICTION;
server.lazyfree_lazy_expire = CONFIG_DEFAULT_LAZYFREE_LAZY_EXPIRE; server.lazyfree_lazy_expire = CONFIG_DEFAULT_LAZYFREE_LAZY_EXPIRE;
@ -1427,7 +1431,8 @@ void initServerConfig(void) {
server.always_show_logo = CONFIG_DEFAULT_ALWAYS_SHOW_LOGO; server.always_show_logo = CONFIG_DEFAULT_ALWAYS_SHOW_LOGO;
server.lua_time_limit = LUA_SCRIPT_TIME_LIMIT; server.lua_time_limit = LUA_SCRIPT_TIME_LIMIT;
server.lruclock = getLRUClock(); unsigned int lruclock = getLRUClock();
atomicSet(server.lruclock,lruclock);
resetServerSaveParams(); resetServerSaveParams();
appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */ appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
@ -2809,6 +2814,8 @@ sds genRedisInfoString(char *section) {
call_uname = 0; call_uname = 0;
} }
unsigned int lruclock;
atomicGet(server.lruclock,lruclock);
info = sdscatprintf(info, info = sdscatprintf(info,
"# Server\r\n" "# Server\r\n"
"redis_version:%s\r\n" "redis_version:%s\r\n"
@ -2848,7 +2855,7 @@ sds genRedisInfoString(char *section) {
(intmax_t)uptime, (intmax_t)uptime,
(intmax_t)(uptime/(3600*24)), (intmax_t)(uptime/(3600*24)),
server.hz, server.hz,
(unsigned long) server.lruclock, (unsigned long) lruclock,
server.executable ? server.executable : "", server.executable ? server.executable : "",
server.configfile ? server.configfile : ""); server.configfile ? server.configfile : "");
} }

View File

@ -563,19 +563,13 @@ typedef struct RedisModuleIO {
typedef struct redisObject { typedef struct redisObject {
unsigned type:4; unsigned type:4;
unsigned encoding:4; unsigned encoding:4;
unsigned lru:LRU_BITS; /* LRU time (relative to server.lruclock) or unsigned lru:LRU_BITS; /* LRU time (relative to global lru_clock) or
* LFU data (least significant 8 bits frequency * LFU data (least significant 8 bits frequency
* and most significant 16 bits decreas time). */ * and most significant 16 bits decreas time). */
int refcount; int refcount;
void *ptr; void *ptr;
} robj; } robj;
/* Macro used to obtain the current LRU clock.
* If the current resolution is lower than the frequency we refresh the
* LRU clock (as it should be in production servers) we return the
* precomputed value, otherwise we need to resort to a system call. */
#define LRU_CLOCK() ((1000/server.hz <= LRU_CLOCK_RESOLUTION) ? server.lruclock : getLRUClock())
/* Macro used to initialize a Redis object allocated on the stack. /* Macro used to initialize a Redis object allocated on the stack.
* Note that this macro is taken near the structure definition to make sure * Note that this macro is taken near the structure definition to make sure
* we'll update it when the structure is changed, to avoid bugs like * we'll update it when the structure is changed, to avoid bugs like
@ -866,7 +860,7 @@ struct redisServer {
dict *commands; /* Command table */ dict *commands; /* Command table */
dict *orig_commands; /* Command table before command renaming. */ dict *orig_commands; /* Command table before command renaming. */
aeEventLoop *el; aeEventLoop *el;
unsigned lruclock:LRU_BITS; /* Clock for LRU eviction */ unsigned int lruclock; /* Clock for LRU eviction */
int shutdown_asap; /* SHUTDOWN needed ASAP */ int shutdown_asap; /* SHUTDOWN needed ASAP */
int activerehashing; /* Incremental rehash in serverCron() */ int activerehashing; /* Incremental rehash in serverCron() */
int active_defrag_running; /* Active defragmentation running (holds current scan aggressiveness) */ int active_defrag_running; /* Active defragmentation running (holds current scan aggressiveness) */
@ -906,6 +900,7 @@ struct redisServer {
char neterr[ANET_ERR_LEN]; /* Error buffer for anet.c */ char neterr[ANET_ERR_LEN]; /* Error buffer for anet.c */
dict *migrate_cached_sockets;/* MIGRATE cached sockets */ dict *migrate_cached_sockets;/* MIGRATE cached sockets */
uint64_t next_client_id; /* Next client unique ID. Incremental. */ uint64_t next_client_id; /* Next client unique ID. Incremental. */
pthread_mutex_t next_client_id_mutex;
int protected_mode; /* Don't accept external connections. */ int protected_mode; /* Don't accept external connections. */
/* RDB / AOF loading information */ /* RDB / AOF loading information */
int loading; /* We are loading data from disk if true */ int loading; /* We are loading data from disk if true */
@ -1608,6 +1603,7 @@ void updateCachedTime(void);
void resetServerStats(void); void resetServerStats(void);
void activeDefragCycle(void); void activeDefragCycle(void);
unsigned int getLRUClock(void); unsigned int getLRUClock(void);
unsigned int LRU_CLOCK(void);
const char *evictPolicyToString(void); const char *evictPolicyToString(void);
struct redisMemOverhead *getMemoryOverheadData(void); struct redisMemOverhead *getMemoryOverheadData(void);
void freeMemoryOverheadData(struct redisMemOverhead *mh); void freeMemoryOverheadData(struct redisMemOverhead *mh);