diff --git a/TODO b/TODO index 3825948d..015a01a1 100644 --- a/TODO +++ b/TODO @@ -4,28 +4,20 @@ VERSION 1.4 TODO (Hash type) ============================ * BRPOPLPUSH -* RPOPLPUSH should notify blocking POP operations * List ops like L/RPUSH L/RPOP should return the new list length. * Save dataset / fsync() on SIGTERM -* MULTI/EXEC should support the "EXEC FSYNC" form -* Synchronous Virtual Memory +* MULTI/EXEC should support the "EXEC FSYNC" form? * BLPOP & C. tests (write a non blocking Tcl client as first step) Virtual Memory sub-TODO: -* Check if the page selection algorithm is working well. +* Check if the page selection algorithm is working well * Divide swappability of objects by refcount -* vm-swap-file . The swap file should go where the user wants, and if it's already there and of the right size we can avoid to create it again. * it should be possible to give the vm-max-memory option in megabyte, gigabyte, ..., just using 2GB, 100MB, and so forth. -* redis-cli vmstat, calling INFO every second and printing VM stats ala vmstat. -* protect zmalloc memory usage increments with a mutex +* Try to understand what can be moved into I/O threads that currently is instead handled by the main thread. For instance swapping file table scannig to find contiguous page could be a potential candidate (but I'm not convinced it's a good idea, better to improve the algorithm, for instance double the fast forward at every step?). -VERSION 1.6 TODO (Virtual memory) -================================= +* Hashes (HSET, HGET, HDEL, HEXISTS, HLEN, ...). -* Asynchronous Virtual Memory -* Hashes (HSET, HGET, HEXISTS, HLEN, ...). - -VERSION 1.8 TODO (Fault tollerant sharding) +VERSION 2.2 TODO (Fault tolerant sharding) =========================================== * Redis-cluster, a fast intermediate layer (proxy) that implements consistent hashing and fault tollerant nodes handling. @@ -34,7 +26,7 @@ Interesting readings about this: - http://ayende.com/Blog/archive/2009/04/06/designing-rhino-dht-a-fault-tolerant-dynamically-distributed-hash.aspx -VERSION 2.0 TODO (Optimizations and latency) +VERSION 2.4 TODO (Optimizations and latency) ============================================ * Lower the CPU usage. @@ -42,7 +34,7 @@ VERSION 2.0 TODO (Optimizations and latency) * Use epool and alike to rewrite ae.c for Linux and other platforms suppporting fater-than-select() mutiplexing APIs. * Implement an UDP interface for low-latency GET/SET operations. -VERSION 2.2 TODO (Optimizations and latency) +VERSION 2.6 TODO (Optimizations and latency) ============================================ * JSON command able to access data serialized in JSON format. For instance if I've a key foobar with a json object I can alter the "name" file using somthing like: "JSON SET foobar name Kevin". We should have GET and INCRBY as well. diff --git a/redis.c b/redis.c index 76784deb..c187b266 100644 --- a/redis.c +++ b/redis.c @@ -27,7 +27,7 @@ * POSSIBILITY OF SUCH DAMAGE. */ -#define REDIS_VERSION "1.3.2" +#define REDIS_VERSION "1.3.3" #include "fmacros.h" #include "config.h" @@ -543,7 +543,7 @@ static void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int static void initClientMultiState(redisClient *c); static void freeClientMultiState(redisClient *c); static void queueMultiCommand(redisClient *c, struct redisCommand *cmd); -static void unblockClient(redisClient *c); +static void unblockClientWaitingData(redisClient *c); static int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele); static void vmInit(void); static void vmMarkPagesFree(off_t page, off_t count); @@ -670,7 +670,7 @@ static struct redisCommand cmdTable[] = { {"lrange",lrangeCommand,4,REDIS_CMD_INLINE}, {"ltrim",ltrimCommand,4,REDIS_CMD_INLINE}, {"lrem",lremCommand,4,REDIS_CMD_BULK}, - {"rpoplpush",rpoplpushcommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM}, + {"rpoplpush",rpoplpushcommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM}, {"sadd",saddCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM}, {"srem",sremCommand,3,REDIS_CMD_BULK}, {"smove",smoveCommand,4,REDIS_CMD_BULK}, @@ -1038,7 +1038,7 @@ static void closeTimedoutClients(void) { } else if (c->flags & REDIS_BLOCKED) { if (c->blockingto != 0 && c->blockingto < now) { addReply(c,shared.nullmultibulk); - unblockClient(c); + unblockClientWaitingData(c); } } } @@ -1672,14 +1672,14 @@ static void freeClient(redisClient *c) { listNode *ln; /* Note that if the client we are freeing is blocked into a blocking - * call, we have to set querybuf to NULL *before* to call unblockClient() - * to avoid processInputBuffer() will get called. Also it is important - * to remove the file events after this, because this call adds - * the READABLE event. */ + * call, we have to set querybuf to NULL *before* to call + * unblockClientWaitingData() to avoid processInputBuffer() will get + * called. Also it is important to remove the file events after + * this, because this call adds the READABLE event. */ sdsfree(c->querybuf); c->querybuf = NULL; if (c->flags & REDIS_BLOCKED) - unblockClient(c); + unblockClientWaitingData(c); aeDeleteFileEvent(server.el,c->fd,AE_READABLE); aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE); @@ -5951,7 +5951,7 @@ static void blockForKeys(redisClient *c, robj **keys, int numkeys, time_t timeou } /* Unblock a client that's waiting in a blocking operation such as BLPOP */ -static void unblockClient(redisClient *c) { +static void unblockClientWaitingData(redisClient *c) { dictEntry *de; list *l; int j; @@ -5975,16 +5975,17 @@ static void unblockClient(redisClient *c) { c->flags &= (~REDIS_BLOCKED); server.blockedclients--; /* Ok now we are ready to get read events from socket, note that we - * can't trap errors here as it's possible that unblockClients() is + * can't trap errors here as it's possible that unblockClientWaitingDatas() is * called from freeClient() itself, and the only thing we can do * if we failed to register the READABLE event is to kill the client. * Still the following function should never fail in the real world as * we are sure the file descriptor is sane, and we exit on out of mem. */ aeCreateFileEvent(server.el, c->fd, AE_READABLE, readQueryFromClient, c); /* As a final step we want to process data if there is some command waiting - * in the input buffer. Note that this is safe even if unblockClient() - * gets called from freeClient() because freeClient() will be smart - * enough to call this function *after* c->querybuf was set to NULL. */ + * in the input buffer. Note that this is safe even if + * unblockClientWaitingData() gets called from freeClient() because + * freeClient() will be smart enough to call this function + * *after* c->querybuf was set to NULL. */ if (c->querybuf && sdslen(c->querybuf) > 0) processInputBuffer(c); } @@ -6018,7 +6019,7 @@ static int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) { addReplyBulkLen(receiver,ele); addReply(receiver,ele); addReply(receiver,shared.crlf); - unblockClient(receiver); + unblockClientWaitingData(receiver); return 1; } @@ -7395,7 +7396,7 @@ static int vmSwapOneObject(int usethreads) { for (j = 0; j < server.dbnum; j++) { redisDb *db = server.db+j; - int maxtries = 1000; + int maxtries = 100; if (dictSize(db->dict) == 0) continue; for (i = 0; i < 5; i++) { @@ -7500,9 +7501,7 @@ static void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata, int mask) { char buf[1]; - int retval; - int processed = 0; - int toprocess = -1; + int retval, processed = 0, toprocess = -1, trytoswap = 1; REDIS_NOTUSED(el); REDIS_NOTUSED(mask); REDIS_NOTUSED(privdata); @@ -7599,7 +7598,9 @@ static void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata, freeIOJob(j); /* Put a few more swap requests in queue if we are still * out of memory */ - if (vmCanSwapOut() && zmalloc_used_memory() > server.vm_max_memory){ + if (trytoswap && vmCanSwapOut() && + zmalloc_used_memory() > server.vm_max_memory) + { int more = 1; while(more) { lockThreadedIO(); @@ -7607,7 +7608,10 @@ static void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata, (unsigned) server.vm_max_threads; unlockThreadedIO(); /* Don't waste CPU time if swappable objects are rare. */ - if (vmSwapOneObjectThreaded() == REDIS_ERR) break; + if (vmSwapOneObjectThreaded() == REDIS_ERR) { + trytoswap = 0; + break; + } } } } @@ -7839,6 +7843,19 @@ static int vmSwapObjectThreaded(robj *key, robj *val, redisDb *db) { return REDIS_OK; } +/* ============ Virtual Memory - Blocking clients on missing keys =========== */ + +/* Is this client attempting to run a command against swapped keys? + * If so, block it ASAP, load the keys in background, then resume it.4 + * + * The improtat thing about this function is that it can fail! If keys will + * still be swapped when the client is resumed, a few of key lookups will + * just block loading keys from disk. */ +#if 0 +static void blockClientOnSwappedKeys(redisClient *c) { +} +#endif + /* ================================= Debugging ============================== */ static void debugCommand(redisClient *c) { diff --git a/redis.tcl b/redis.tcl index 8ad96f45..d52957ad 100644 --- a/redis.tcl +++ b/redis.tcl @@ -20,7 +20,7 @@ array set ::redis::multibulkarg {} # Flag commands requiring last argument as a bulk write operation foreach redis_bulk_cmd { - set setnx rpush lpush lset lrem sadd srem sismember echo getset smove zadd zrem zscore rpoplpush zincrby + set setnx rpush lpush lset lrem sadd srem sismember echo getset smove zadd zrem zscore zincrby } { set ::redis::bulkarg($redis_bulk_cmd) {} } diff --git a/staticsymbols.h b/staticsymbols.h index e59e87ba..7100cd89 100644 --- a/staticsymbols.h +++ b/staticsymbols.h @@ -206,7 +206,7 @@ static struct redisFunctionSym symsTable[] = { {"tryResizeHashTables",(unsigned long)tryResizeHashTables}, {"ttlCommand",(unsigned long)ttlCommand}, {"typeCommand",(unsigned long)typeCommand}, -{"unblockClient",(unsigned long)unblockClient}, +{"unblockClientWaitingData",(unsigned long)unblockClientWaitingData}, {"unlockThreadedIO",(unsigned long)unlockThreadedIO}, {"updateSlavesWaitingBgsave",(unsigned long)updateSlavesWaitingBgsave}, {"vmCanSwapOut",(unsigned long)vmCanSwapOut},