mirror of
https://github.com/fluencelabs/redis
synced 2025-03-17 08:00:49 +00:00
cron part of disk store object cache implemented. Objects are pushed as IO jobs if needed, so that the IO thread will process them.
This commit is contained in:
parent
cea8c5cd75
commit
f63f0928c3
@ -349,6 +349,7 @@ void *IOThreadEntryPoint(void *arg) {
|
||||
j->val = dsGet(j->db,j->key);
|
||||
redisAssert(j->val != NULL);
|
||||
} else if (j->type == REDIS_IOJOB_SAVE) {
|
||||
redisAssert(j->val->storage == REDIS_DS_SAVING);
|
||||
if (j->val)
|
||||
dsSet(j->db,j->key,j->val);
|
||||
else
|
||||
@ -443,6 +444,66 @@ void dsCreateIOJob(int type, redisDb *db, robj *key, robj *val) {
|
||||
unlockThreadedIO();
|
||||
}
|
||||
|
||||
void cacheScheduleForFlush(redisDb *db, robj *key) {
|
||||
dirtykey *dk;
|
||||
dictEntry *de;
|
||||
|
||||
de = dictFind(db->dict,key->ptr);
|
||||
if (de) {
|
||||
robj *val = dictGetEntryVal(de);
|
||||
if (val->storage == REDIS_DS_DIRTY)
|
||||
return;
|
||||
else
|
||||
val->storage = REDIS_DS_DIRTY;
|
||||
}
|
||||
|
||||
dk = zmalloc(sizeof(*dk));
|
||||
dk->db = db;
|
||||
dk->key = key;
|
||||
incrRefCount(key);
|
||||
dk->ctime = time(NULL);
|
||||
listAddNodeTail(server.cache_flush_queue, key);
|
||||
}
|
||||
|
||||
void cacheCron(void) {
|
||||
time_t now = time(NULL);
|
||||
listNode *ln;
|
||||
|
||||
/* Sync stuff on disk */
|
||||
while((ln = listFirst(server.cache_flush_queue)) != NULL) {
|
||||
dirtykey *dk = ln->value;
|
||||
|
||||
if ((now - dk->ctime) >= server.cache_flush_delay) {
|
||||
struct dictEntry *de;
|
||||
robj *val;
|
||||
|
||||
/* Lookup the key. We need to check if it's still here and
|
||||
* possibly access to the value. */
|
||||
de = dictFind(dk->db->dict,dk->key->ptr);
|
||||
if (de) {
|
||||
val = dictGetEntryVal(de);
|
||||
redisAssert(val->storage == REDIS_DS_DIRTY);
|
||||
val->storage = REDIS_DS_SAVING;
|
||||
} else {
|
||||
/* Setting the value to NULL tells the IO thread to delete
|
||||
* the key on disk. */
|
||||
val = NULL;
|
||||
}
|
||||
dsCreateIOJob(REDIS_IOJOB_SAVE,dk->db,dk->key,val);
|
||||
listDelNode(server.cache_flush_queue,ln);
|
||||
} else {
|
||||
break; /* too early */
|
||||
}
|
||||
}
|
||||
|
||||
/* Reclaim memory from the object cache */
|
||||
while (server.ds_enabled && zmalloc_used_memory() >
|
||||
server.cache_max_memory)
|
||||
{
|
||||
if (cacheFreeOneEntry() == REDIS_ERR) break;
|
||||
}
|
||||
}
|
||||
|
||||
/* ============ Virtual Memory - Blocking clients on missing keys =========== */
|
||||
|
||||
/* This function makes the clinet 'c' waiting for the key 'key' to be loaded.
|
||||
@ -454,6 +515,7 @@ void dsCreateIOJob(int type, redisDb *db, robj *key, robj *val) {
|
||||
* - if it's REDIS_DS_SAVING the key is being saved by an IO Job. When
|
||||
* the client will lookup the key it will block if the key is still
|
||||
* in this stage but it's more or less the best we can do.
|
||||
*
|
||||
* FIXME: we should try if it's actually better to suspend the client
|
||||
* accessing an object that is being saved, and awake it only when
|
||||
* the saving was completed.
|
||||
|
@ -620,11 +620,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
|
||||
|
||||
/* Remove a few cached objects from memory if we are over the
|
||||
* configured memory limit */
|
||||
while (server.ds_enabled && zmalloc_used_memory() >
|
||||
server.cache_max_memory)
|
||||
{
|
||||
if (cacheFreeOneEntry() == REDIS_ERR) break;
|
||||
}
|
||||
if (server.ds_enabled) cacheCron();
|
||||
|
||||
/* Replication cron function -- used to reconnect to master and
|
||||
* to detect transfer failures. */
|
||||
|
@ -800,6 +800,8 @@ int blockClientOnSwappedKeys(redisClient *c, struct redisCommand *cmd);
|
||||
int dontWaitForSwappedKey(redisClient *c, robj *key);
|
||||
void handleClientsBlockedOnSwappedKey(redisDb *db, robj *key);
|
||||
int cacheFreeOneEntry(void);
|
||||
void cacheScheduleForFlush(redisDb *db, robj *key);
|
||||
void cacheCron(void);
|
||||
|
||||
/* Set data type */
|
||||
robj *setTypeCreate(robj *value);
|
||||
|
Loading…
x
Reference in New Issue
Block a user