From b5197f1fc9d6fde776168951094e44d5e8742a89 Mon Sep 17 00:00:00 2001
From: Guy Benoish <guy.benoish@redislabs.com>
Date: Wed, 20 Jun 2018 14:40:18 +0700
Subject: [PATCH] Enhance RESTORE with RDBv9 new features

RESTORE now supports:
1. Setting LRU/LFU
2. Absolute-time TTL

Other related changes:
1. RDB loading will not override LRU bits when RDB file
   does not contain the LRU opcode.
2. RDB loading will not set LRU/LFU bits if the server's
   maxmemory-policy does not match.
---
 src/cluster.c       | 34 +++++++++++++++++++++++++++++++---
 src/object.c        | 26 ++++++++++++++++++++++++++
 src/rdb.c           | 27 ++++++++-------------------
 src/server.h        |  2 ++
 tests/unit/dump.tcl | 33 +++++++++++++++++++++++++++++++++
 5 files changed, 100 insertions(+), 22 deletions(-)

diff --git a/src/cluster.c b/src/cluster.c
index 961241d4..dac03ec7 100644
--- a/src/cluster.c
+++ b/src/cluster.c
@@ -4835,15 +4835,39 @@ void dumpCommand(client *c) {
 
 /* RESTORE key ttl serialized-value [REPLACE] */
 void restoreCommand(client *c) {
-    long long ttl;
+    long long ttl, lfu_freq = -1, lru_idle = -1, lru_clock;
     rio payload;
-    int j, type, replace = 0;
+    int j, type, replace = 0, absttl = 0;
     robj *obj;
 
     /* Parse additional options */
     for (j = 4; j < c->argc; j++) {
+        int additional = c->argc-j-1;
         if (!strcasecmp(c->argv[j]->ptr,"replace")) {
             replace = 1;
+        } else if (!strcasecmp(c->argv[j]->ptr,"absttl")) {
+            absttl = 1;
+        } else if (!strcasecmp(c->argv[j]->ptr,"idletime") && additional >= 1 &&
+                   lfu_freq == -1)
+        {
+            if (getLongLongFromObjectOrReply(c,c->argv[j+1],&lru_idle,NULL)
+                    != C_OK) return;
+            if (lru_idle < 0) {
+                addReplyError(c,"Invalid IDLETIME value, must be >= 0");
+                return;
+            }
+            lru_clock = LRU_CLOCK();
+            j++; /* Consume additional arg. */
+        } else if (!strcasecmp(c->argv[j]->ptr,"freq") && additional >= 1 &&
+                   lru_idle == -1)
+        {
+            if (getLongLongFromObjectOrReply(c,c->argv[j+1],&lfu_freq,NULL)
+                    != C_OK) return;
+            if (lfu_freq < 0 || lfu_freq > 255) {
+                addReplyError(c,"Invalid FREQ value, must be >= 0 and <= 255");
+                return;
+            }
+            j++; /* Consume additional arg. */
         } else {
             addReply(c,shared.syntaxerr);
             return;
@@ -4884,7 +4908,11 @@ void restoreCommand(client *c) {
 
     /* Create the key and set the TTL if any */
     dbAdd(c->db,c->argv[1],obj);
-    if (ttl) setExpire(c,c->db,c->argv[1],mstime()+ttl);
+    if (ttl) {
+        if (!absttl) ttl+=mstime();
+        setExpire(c,c->db,c->argv[1],ttl);
+    }
+    objectSetLRUOrLFU(obj,lfu_freq,lru_idle,lru_clock);
     signalModifiedKey(c->db,c->argv[1]);
     addReply(c,shared.ok);
     server.dirty++;
diff --git a/src/object.c b/src/object.c
index a77f2aed..89166582 100644
--- a/src/object.c
+++ b/src/object.c
@@ -1166,6 +1166,32 @@ sds getMemoryDoctorReport(void) {
     return s;
 }
 
+/* Set the object LRU/LFU depending on server.maxmemory_policy.
+ * The lfu_freq arg is only relevant if policy is MAXMEMORY_FLAG_LFU.
+ * The lru_idle and lru_clock args are only relevant if policy 
+ * is MAXMEMORY_FLAG_LRU.
+ * Either or both of them may be <0, in that case, nothing is set. */
+void objectSetLRUOrLFU(robj *val, long long lfu_freq, long long lru_idle,
+                       long long lru_clock) {
+    if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
+        if (lfu_freq >= 0) {
+            serverAssert(lfu_freq <= 255);
+            val->lru = (LFUGetTimeInMinutes()<<8) | lfu_freq;
+        }
+    } else if (lru_idle >= 0) {
+        /* Serialized LRU idle time is in seconds. Scale
+         * according to the LRU clock resolution this Redis
+         * instance was compiled with (normally 1000 ms, so the
+         * below statement will expand to lru_idle*1000/1000. */
+        lru_idle = lru_idle*1000/LRU_CLOCK_RESOLUTION;
+        val->lru = lru_clock - lru_idle;
+        /* If the lru field overflows (since LRU it is a wrapping
+         * clock), the best we can do is to provide the maximum
+         * representable idle time. */
+        if (val->lru < 0) val->lru = lru_clock+1;
+    }
+}
+
 /* ======================= The OBJECT and MEMORY commands =================== */
 
 /* This is a helper function for the OBJECT command. We need to lookup keys
diff --git a/src/rdb.c b/src/rdb.c
index e37e2f94..d14c3bd2 100644
--- a/src/rdb.c
+++ b/src/rdb.c
@@ -1871,11 +1871,9 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) {
     }
 
     /* Key-specific attributes, set by opcodes before the key type. */
-    long long expiretime = -1, now = mstime();
+    long long lru_idle = -1, lfu_freq = -1, expiretime = -1, now = mstime();
     long long lru_clock = LRU_CLOCK();
-    uint64_t lru_idle = -1;
-    int lfu_freq = -1;
-
+    
     while(1) {
         robj *key, *val;
 
@@ -1903,7 +1901,9 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) {
             continue; /* Read next opcode. */
         } else if (type == RDB_OPCODE_IDLE) {
             /* IDLE: LRU idle time. */
-            if ((lru_idle = rdbLoadLen(rdb,NULL)) == RDB_LENERR) goto eoferr;
+            uint64_t qword;
+            if ((qword = rdbLoadLen(rdb,NULL)) == RDB_LENERR) goto eoferr;
+            lru_idle = qword;
             continue; /* Read next opcode. */
         } else if (type == RDB_OPCODE_EOF) {
             /* EOF: End of file, exit the main loop. */
@@ -2022,20 +2022,9 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) {
 
             /* Set the expire time if needed */
             if (expiretime != -1) setExpire(NULL,db,key,expiretime);
-            if (lfu_freq != -1) {
-                val->lru = (LFUGetTimeInMinutes()<<8) | lfu_freq;
-            } else {
-                /* LRU idle time loaded from RDB is in seconds. Scale
-                 * according to the LRU clock resolution this Redis
-                 * instance was compiled with (normaly 1000 ms, so the
-                 * below statement will expand to lru_idle*1000/1000. */
-                lru_idle = lru_idle*1000/LRU_CLOCK_RESOLUTION;
-                val->lru = lru_clock - lru_idle;
-                /* If the lru field overflows (since LRU it is a wrapping
-                 * clock), the best we can do is to provide the maxium
-                 * representable idle time. */
-                if (val->lru < 0) val->lru = lru_clock+1;
-            }
+            
+            /* Set usage information (for eviction). */
+            objectSetLRUOrLFU(val,lfu_freq,lru_idle,lru_clock);
 
             /* Decrement the key refcount since dbAdd() will take its
              * own reference. */
diff --git a/src/server.h b/src/server.h
index f7aff36b..a130d5b1 100644
--- a/src/server.h
+++ b/src/server.h
@@ -1772,6 +1772,8 @@ robj *lookupKeyWriteOrReply(client *c, robj *key, robj *reply);
 robj *lookupKeyReadWithFlags(redisDb *db, robj *key, int flags);
 robj *objectCommandLookup(client *c, robj *key);
 robj *objectCommandLookupOrReply(client *c, robj *key, robj *reply);
+void objectSetLRUOrLFU(robj *val, long long lfu_freq, long long lru_idle,
+                       long long lru_clock);
 #define LOOKUP_NONE 0
 #define LOOKUP_NOTOUCH (1<<0)
 void dbAdd(redisDb *db, robj *key, robj *val);
diff --git a/tests/unit/dump.tcl b/tests/unit/dump.tcl
index 8bb0165c..3e6051fe 100644
--- a/tests/unit/dump.tcl
+++ b/tests/unit/dump.tcl
@@ -25,6 +25,39 @@ start_server {tags {"dump"}} {
         assert {$ttl >= (2569591501-3000) && $ttl <= 2569591501}
         r get foo
     } {bar}
+    
+    test {RESTORE can set an absolute expire} {
+        r set foo bar
+        set encoded [r dump foo]
+        r del foo
+        set now [clock milliseconds]
+        r restore foo [expr $now+3000] $encoded absttl
+        set ttl [r pttl foo]
+        assert {$ttl >= 2998 && $ttl <= 3000}
+        r get foo
+    } {bar}
+    
+    test {RESTORE can set LRU} {
+        r set foo bar
+        set encoded [r dump foo]
+        r del foo
+        r config set maxmemory-policy allkeys-lru
+        r restore foo 0 $encoded idletime 1000
+        set idle [r object idletime foo]
+        assert {$idle >= 1000 && $idle <= 1002}
+        r get foo
+    } {bar}
+    
+    test {RESTORE can set LFU} {
+        r set foo bar
+        set encoded [r dump foo]
+        r del foo
+        r config set maxmemory-policy allkeys-lfu
+        r restore foo 0 $encoded freq 100
+        set freq [r object freq foo]
+        assert {$freq == 100}
+        r get foo
+    } {bar}
 
     test {RESTORE returns an error of the key already exists} {
         r set foo bar