diff --git a/src/Makefile b/src/Makefile index 86e0b3fe..b896b126 100644 --- a/src/Makefile +++ b/src/Makefile @@ -144,7 +144,7 @@ endif REDIS_SERVER_NAME=redis-server REDIS_SENTINEL_NAME=redis-sentinel -REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o +REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.c REDIS_CLI_NAME=redis-cli REDIS_CLI_OBJ=anet.o adlist.o redis-cli.o zmalloc.o release.o anet.o ae.o crc64.o REDIS_BENCHMARK_NAME=redis-benchmark diff --git a/src/listpack.c b/src/listpack.c new file mode 100644 index 00000000..e2702b65 --- /dev/null +++ b/src/listpack.c @@ -0,0 +1,783 @@ +/* Listpack -- A lists of strings serialization format + * + * This file implements the specification you can find at: + * + * https://github.com/antirez/listpack + * + * Copyright (c) 2017, Salvatore Sanfilippo + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of Redis nor the names of its contributors may be used + * to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include +#include +#include +#include +#include +#include + +#include "listpack.h" +#include "listpack_malloc.h" + +#define LP_HDR_SIZE 6 /* 32 bit total len + 16 bit number of elements. */ +#define LP_HDR_NUMELE_UNKNOWN UINT16_MAX +#define LP_MAX_INT_ENCODING_LEN 9 +#define LP_MAX_BACKLEN_SIZE 5 +#define LP_MAX_ENTRY_BACKLEN 34359738367ULL +#define LP_ENCODING_INT 0 +#define LP_ENCODING_STRING 1 + +#define LP_ENCODING_7BIT_UINT 0 +#define LP_ENCODING_7BIT_UINT_MASK 0x80 +#define LP_ENCODING_IS_7BIT_UINT(byte) (((byte)&LP_ENCODING_7BIT_UINT_MASK)==LP_ENCODING_7BIT_UINT) + +#define LP_ENCODING_6BIT_STR 0x80 +#define LP_ENCODING_6BIT_STR_MASK 0xC0 +#define LP_ENCODING_IS_6BIT_STR(byte) (((byte)&LP_ENCODING_6BIT_STR_MASK)==LP_ENCODING_6BIT_STR) + +#define LP_ENCODING_13BIT_INT 0xC0 +#define LP_ENCODING_13BIT_INT_MASK 0xE0 +#define LP_ENCODING_IS_13BIT_INT(byte) (((byte)&LP_ENCODING_13BIT_INT_MASK)==LP_ENCODING_13BIT_INT) + +#define LP_ENCODING_12BIT_STR 0xE0 +#define LP_ENCODING_12BIT_STR_MASK 0xF0 +#define LP_ENCODING_IS_12BIT_STR(byte) (((byte)&LP_ENCODING_12BIT_STR_MASK)==LP_ENCODING_12BIT_STR) + +#define LP_ENCODING_16BIT_INT 0xF1 +#define LP_ENCODING_16BIT_INT_MASK 0xFF +#define LP_ENCODING_IS_16BIT_INT(byte) (((byte)&LP_ENCODING_16BIT_INT_MASK)==LP_ENCODING_16BIT_INT) + +#define LP_ENCODING_24BIT_INT 0xF2 +#define LP_ENCODING_24BIT_INT_MASK 0xFF +#define LP_ENCODING_IS_24BIT_INT(byte) (((byte)&LP_ENCODING_24BIT_INT_MASK)==LP_ENCODING_24BIT_INT) + +#define LP_ENCODING_32BIT_INT 0xF3 +#define LP_ENCODING_32BIT_INT_MASK 0xFF +#define LP_ENCODING_IS_32BIT_INT(byte) (((byte)&LP_ENCODING_32BIT_INT_MASK)==LP_ENCODING_32BIT_INT) + +#define LP_ENCODING_64BIT_INT 0xF4 +#define LP_ENCODING_64BIT_INT_MASK 0xFF +#define LP_ENCODING_IS_64BIT_INT(byte) (((byte)&LP_ENCODING_64BIT_INT_MASK)==LP_ENCODING_64BIT_INT) + +#define LP_ENCODING_32BIT_STR 0xF0 +#define LP_ENCODING_32BIT_STR_MASK 0xFF +#define LP_ENCODING_IS_32BIT_STR(byte) (((byte)&LP_ENCODING_32BIT_STR_MASK)==LP_ENCODING_32BIT_STR) + +#define LP_EOF 0xFF + +#define LP_ENCODING_6BIT_STR_LEN(p) ((p)[0] & 0x3F) +#define LP_ENCODING_12BIT_STR_LEN(p) ((((p)[0] & 0xF) << 8) | (p)[1]) +#define LP_ENCODING_32BIT_STR_LEN(p) (((uint32_t)(p)[1]<<0) | \ + ((uint32_t)(p)[2]<<8) | \ + ((uint32_t)(p)[3]<<16) | \ + ((uint32_t)(p)[4]<<24)) + +#define lpGetTotalBytes(p) (((uint32_t)(p)[0]<<0) | \ + ((uint32_t)(p)[1]<<8) | \ + ((uint32_t)(p)[2]<<16) | \ + ((uint32_t)(p)[3]<<24)) + +#define lpGetNumElements(p) (((uint32_t)(p)[4]<<0) | \ + ((uint32_t)(p)[5]<<8)) +#define lpSetTotalBytes(p,v) do { \ + (p)[0] = (v)&0xff; \ + (p)[1] = ((v)>>8)&0xff; \ + (p)[2] = ((v)>>16)&0xff; \ + (p)[3] = ((v)>>24)&0xff; \ +} while(0) + +#define lpSetNumElements(p,v) do { \ + (p)[4] = (v)&0xff; \ + (p)[5] = ((v)>>8)&0xff; \ +} while(0) + +/* Convert a string into a signed 64 bit integer. + * The function returns 1 if the string could be parsed into a (non-overflowing) + * signed 64 bit int, 0 otherwise. The 'value' will be set to the parsed value + * when the function returns success. + * + * Note that this function demands that the string strictly represents + * a int64 value: no spaces or other characters before or after the string + * representing the number are accepted, nor zeroes at the start if not + * for the string "0" representing the zero number. + * + * Because of its strictness, it is safe to use this function to check if + * you can convert a string into a long long, and obtain back the string + * from the number without any loss in the string representation. * + * + * ----------------------------------------------------------------------------- + * + * Credits: this function was adapted from the Redis source code, file + * "utils.c", function string2ll(), and is copyright: + * + * Copyright(C) 2011, Pieter Noordhuis + * Copyright(C) 2011, Salvatore Sanfilippo + * + * The function is released under the BSD 3-clause license. + */ +int lpStringToInt64(const char *s, unsigned long slen, int64_t *value) { + const char *p = s; + unsigned long plen = 0; + int negative = 0; + uint64_t v; + + if (plen == slen) + return 0; + + /* Special case: first and only digit is 0. */ + if (slen == 1 && p[0] == '0') { + if (value != NULL) *value = 0; + return 1; + } + + if (p[0] == '-') { + negative = 1; + p++; plen++; + + /* Abort on only a negative sign. */ + if (plen == slen) + return 0; + } + + /* First digit should be 1-9, otherwise the string should just be 0. */ + if (p[0] >= '1' && p[0] <= '9') { + v = p[0]-'0'; + p++; plen++; + } else if (p[0] == '0' && slen == 1) { + *value = 0; + return 1; + } else { + return 0; + } + + while (plen < slen && p[0] >= '0' && p[0] <= '9') { + if (v > (UINT64_MAX / 10)) /* Overflow. */ + return 0; + v *= 10; + + if (v > (UINT64_MAX - (p[0]-'0'))) /* Overflow. */ + return 0; + v += p[0]-'0'; + + p++; plen++; + } + + /* Return if not all bytes were used. */ + if (plen < slen) + return 0; + + if (negative) { + if (v > ((uint64_t)(-(INT64_MIN+1))+1)) /* Overflow. */ + return 0; + if (value != NULL) *value = -v; + } else { + if (v > INT64_MAX) /* Overflow. */ + return 0; + if (value != NULL) *value = v; + } + return 1; +} + +/* Create a new, empty listpack. + * On success the new listpack is returned, otherwise an error is returned. */ +unsigned char *lpNew(void) { + unsigned char *lp = lp_malloc(LP_HDR_SIZE+1); + if (lp == NULL) return NULL; + lpSetTotalBytes(lp,LP_HDR_SIZE+1); + lpSetNumElements(lp,0); + lp[LP_HDR_SIZE] = LP_EOF; + return lp; +} + +/* Free the specified listpack. */ +void lpFree(unsigned char *lp) { + lp_free(lp); +} + +/* Given an element 'ele' of size 'size', determine if the element can be + * represented inside the listpack encoded as integer, and returns + * LP_ENCODING_INT if so. Otherwise returns LP_ENCODING_STR if no integer + * encoding is possible. + * + * If the LP_ENCODING_INT is returned, the function stores the integer encoded + * representation of the element in the 'intenc' buffer. + * + * Regardless of the returned encoding, 'enclen' is populated by reference to + * the number of bytes that the string or integer encoded element will require + * in order to be represented. */ +int lpEncodeGetType(unsigned char *ele, uint32_t size, unsigned char *intenc, uint64_t *enclen) { + int64_t v; + if (lpStringToInt64((const char*)ele, size, &v)) { + if (v >= 0 && v <= 127) { + /* Single byte 0-127 integer. */ + intenc[0] = v; + *enclen = 1; + } else if (v >= -4096 && v <= 4095) { + /* 13 bit integer. */ + if (v < 0) v = ((int64_t)1<<13)+v; + intenc[0] = (v>>8)|LP_ENCODING_13BIT_INT; + intenc[1] = v&0xff; + *enclen = 2; + } else if (v >= -32768 && v <= 32767) { + /* 16 bit integer. */ + if (v < 0) v = ((int64_t)1<<16)+v; + intenc[0] = LP_ENCODING_16BIT_INT; + intenc[1] = v&0xff; + intenc[2] = v>>8; + *enclen = 3; + } else if (v >= -8388608 && v <= 8388607) { + /* 24 bit integer. */ + if (v < 0) v = ((int64_t)1<<24)+v; + intenc[0] = LP_ENCODING_24BIT_INT; + intenc[1] = v&0xff; + intenc[2] = (v>>8)&0xff; + intenc[3] = v>>16; + *enclen = 4; + } else if (v >= -2147483648 && v <= 2147483647) { + /* 32 bit integer. */ + if (v < 0) v = ((int64_t)1<<32)+v; + intenc[0] = LP_ENCODING_32BIT_INT; + intenc[1] = v&0xff; + intenc[2] = (v>>8)&0xff; + intenc[3] = (v>>16)&0xff; + intenc[4] = v>>24; + *enclen = 5; + } else { + /* 64 bit integer. */ + uint64_t uv = v; + intenc[0] = LP_ENCODING_64BIT_INT; + intenc[1] = uv&0xff; + intenc[2] = (uv>>8)&0xff; + intenc[3] = (uv>>16)&0xff; + intenc[4] = (uv>>24)&0xff; + intenc[5] = (uv>>32)&0xff; + intenc[6] = (uv>>40)&0xff; + intenc[7] = (uv>>48)&0xff; + intenc[8] = uv>>56; + *enclen = 9; + } + return LP_ENCODING_INT; + } else { + if (size < 64) *enclen = 1+size; + else if (size < 4096) *enclen = 2+size; + else *enclen = 4+size; + return LP_ENCODING_STRING; + } +} + +/* Store a reverse-encoded variable length field, representing the length + * of the previous element of size 'l', in the target buffer 'buf'. + * The function returns the number of bytes used to encode it, from + * 1 to 5. If 'buf' is NULL the funciton just returns the number of bytes + * needed in order to encode the backlen. */ +unsigned long lpEncodeBacklen(unsigned char *buf, uint64_t l) { + if (l <= 127) { + if (buf) buf[0] = l; + return 1; + } else if (l < 16383) { + if (buf) { + buf[0] = l>>7; + buf[1] = (l&127)|128; + } + return 2; + } else if (l < 2097151) { + if (buf) { + buf[0] = l>>14; + buf[1] = ((l>>7)&127)|128; + buf[2] = (l&127)|128; + } + return 3; + } else if (l < 268435455) { + if (buf) { + buf[0] = l>>21; + buf[1] = ((l>>14)&127)|128; + buf[2] = ((l>>7)&127)|128; + buf[3] = (l&127)|128; + } + return 4; + } else { + if (buf) { + buf[0] = l>>28; + buf[1] = ((l>>21)&127)|128; + buf[2] = ((l>>14)&127)|128; + buf[3] = ((l>>7)&127)|128; + buf[4] = (l&127)|128; + } + return 5; + } +} + +/* Decode the backlen and returns it. If the encoding looks invalid (more than + * 5 bytes are used), UINT64_MAX is returned to report the problem. */ +uint64_t lpDecodeBacklen(unsigned char *p) { + uint64_t val = 0; + uint64_t shift = 0; + do { + val |= (uint64_t)(p[0] & 127) << shift; + if (!(p[0] & 128)) break; + shift += 7; + p--; + if (shift > 28) return UINT64_MAX; + } while(1); + return val; +} + +/* Encode the string element pointed by 's' of size 'len' in the target + * buffer 's'. The function should be called with 'buf' having always enough + * space for encoding the string. This is done by calling lpEncodeGetType() + * before calling this function. */ +void lpEncodeString(unsigned char *buf, unsigned char *s, uint32_t len) { + if (len < 64) { + buf[0] = len | LP_ENCODING_6BIT_STR; + memcpy(buf+1,s,len); + } else if (len < 4096) { + buf[0] = (len >> 8) | LP_ENCODING_12BIT_STR; + buf[1] = len & 0xff; + memcpy(buf+2,s,len); + } else { + buf[0] = LP_ENCODING_32BIT_STR; + buf[1] = len & 0xff; + buf[2] = (len >> 8) & 0xff; + buf[3] = (len >> 16) & 0xff; + buf[4] = (len >> 24) & 0xff; + memcpy(buf+4,s,len); + } +} + +/* Return the encoded length of the listpack element pointed by 'p'. If the + * element encoding is wrong then 0 is returned. */ +uint32_t lpCurrentEncodedSize(unsigned char *p) { + if (LP_ENCODING_IS_7BIT_UINT(p[0])) return 1; + if (LP_ENCODING_IS_6BIT_STR(p[0])) return 1+LP_ENCODING_6BIT_STR_LEN(p); + if (LP_ENCODING_IS_13BIT_INT(p[0])) return 2; + if (LP_ENCODING_IS_16BIT_INT(p[0])) return 3; + if (LP_ENCODING_IS_24BIT_INT(p[0])) return 4; + if (LP_ENCODING_IS_32BIT_INT(p[0])) return 5; + if (LP_ENCODING_IS_64BIT_INT(p[0])) return 9; + if (LP_ENCODING_IS_12BIT_STR(p[0])) return 2+LP_ENCODING_12BIT_STR_LEN(p); + if (LP_ENCODING_IS_32BIT_STR(p[0])) return 5+LP_ENCODING_32BIT_STR_LEN(p); + if (p[0] == LP_EOF) return 1; + return 0; +} + +/* Skip the current entry returning the next. It is invalid to call this + * function if the current element is the EOF element at the end of the + * listpack, however, while this function is used to implement lpNext(), + * it does not return NULL when the EOF element is encountered. */ +unsigned char *lpSkip(unsigned char *p) { + unsigned long entrylen = lpCurrentEncodedSize(p); + entrylen += lpEncodeBacklen(NULL,entrylen); + p += entrylen; + return p; +} + +/* If 'p' points to an element of the listpack, calling lpNext() will return + * the pointer to the next element (the one on the right), or NULL if 'p' + * already pointed to the last element of the listpack. */ +unsigned char *lpNext(unsigned char *lp, unsigned char *p) { + ((void) lp); /* lp is not used for now. However lpPrev() uses it. */ + p = lpSkip(p); + if (p[0] == LP_EOF) return NULL; + return p; +} + +/* If 'p' points to an element of the listpack, calling lpPrev() will return + * the pointer to the preivous element (the one on the left), or NULL if 'p' + * already pointed to the first element of the listpack. */ +unsigned char *lpPrev(unsigned char *lp, unsigned char *p) { + if (p-lp == LP_HDR_SIZE) return NULL; + p--; /* Seek the first backlen byte of the last element. */ + uint64_t prevlen = lpDecodeBacklen(p); + prevlen += lpEncodeBacklen(NULL,prevlen); + return p-prevlen+1; /* Seek the first byte of the previous entry. */ +} + +/* Return a pointer to the first element of the listpack, or NULL if the + * listpack has no elements. */ +unsigned char *lpFirst(unsigned char *lp) { + lp += LP_HDR_SIZE; /* Skip the header. */ + if (lp[0] == LP_EOF) return NULL; + return lp; +} + +/* Return a pointer to the last element of the listpack, or NULL if the + * listpack has no elements. */ +unsigned char *lpLast(unsigned char *lp) { + unsigned char *p = lp+lpGetTotalBytes(lp)-1; /* Seek EOF element. */ + return lpPrev(lp,p); /* Will return NULL if EOF is the only element. */ +} + +/* Return the number of elements inside the listpack. This function attempts + * to use the cached value when within range, otherwise a full scan is + * needed. As a side effect of calling this function, the listpack header + * could be modified, because if the count is found to be already within + * the 'numele' header field range, the new value is set. */ +uint32_t lpLength(unsigned char *lp) { + uint32_t numele = lpGetNumElements(lp); + if (numele != LP_HDR_NUMELE_UNKNOWN) return numele; + + /* Too many elements inside the listpack. We need to scan in order + * to get the total number. */ + uint32_t count = 0; + unsigned char *p = lpFirst(lp); + while(p) { + count++; + p = lpNext(lp,p); + } + + /* If the count is again within range of the header numele field, + * set it. */ + if (count < LP_HDR_NUMELE_UNKNOWN) lpSetNumElements(lp,count); + return count; +} + +/* Return the listpack element pointed by 'p'. + * + * The function changes behavior depending on the passed 'intbuf' value. + * Specifically, if 'intbuf' is NULL: + * + * If the element is internally encoded as an integer, the function returns + * NULL and populates the integer value by reference in 'count'. Otherwise if + * the element is encoded as a string a pointer to the string (pointing inside + * the listpack itself) is returned, and 'count' is set to the length of the + * string. + * + * If instead 'intbuf' points to a buffer passed by the caller, that must be + * at least LP_INTBUF_SIZE bytes, the function always returns the element as + * it was a string (returning the pointer to the string and setting the + * 'count' argument to the string length by reference). However if the element + * is encoded as an integer, the 'intbuf' buffer is used in order to store + * the string representation. + * + * The user should use one or the other form depending on what the value will + * be used for. If there is immediate usage for an integer value returned + * by the function, than to pass a buffer (and convert it back to a number) + * is of course useless. + * + * If the function is called against a badly encoded ziplist, so that there + * is no valid way to parse it, the function returns like if there was an + * integer encoded with value 12345678900000000 + , this may + * be an hint to understand that something is wrong. To crash in this case is + * not sensible because of the different requirements of the application using + * this lib. + * + * Similarly, there is no error returned since the listpack normally can be + * assumed to be valid, so that would be a very high API cost. However a function + * in order to check the integrity of the listpack at load time is provided, + * check lpIsValid(). */ +unsigned char *lpGet(unsigned char *p, int64_t *count, unsigned char *intbuf) { + int64_t val; + uint64_t uval, negstart, negmax; + + if (LP_ENCODING_IS_7BIT_UINT(p[0])) { + negstart = UINT64_MAX; /* 7 bit ints are always positive. */ + negmax = 0; + uval = p[0] & 0x7f; + } else if (LP_ENCODING_IS_6BIT_STR(p[0])) { + *count = LP_ENCODING_6BIT_STR_LEN(p); + return p+1; + } else if (LP_ENCODING_IS_13BIT_INT(p[0])) { + uval = ((p[0]&0x1f)<<8) | p[1]; + negstart = (uint64_t)1<<12; + negmax = 8191; + } else if (LP_ENCODING_IS_16BIT_INT(p[0])) { + uval = (uint64_t)p[1] | + (uint64_t)p[2]<<8; + negstart = (uint64_t)1<<15; + negmax = UINT16_MAX; + } else if (LP_ENCODING_IS_24BIT_INT(p[0])) { + uval = (uint64_t)p[1] | + (uint64_t)p[2]<<8 | + (uint64_t)p[3]<<16; + negstart = (uint64_t)1<<23; + negmax = UINT32_MAX>>8; + } else if (LP_ENCODING_IS_32BIT_INT(p[0])) { + uval = (uint64_t)p[1] | + (uint64_t)p[2]<<8 | + (uint64_t)p[3]<<16 | + (uint64_t)p[4]<<24; + negstart = (uint64_t)1<<31; + negmax = UINT32_MAX; + } else if (LP_ENCODING_IS_64BIT_INT(p[0])) { + uval = (uint64_t)p[1] | + (uint64_t)p[2]<<8 | + (uint64_t)p[3]<<16 | + (uint64_t)p[4]<<24 | + (uint64_t)p[5]<<32 | + (uint64_t)p[6]<<40 | + (uint64_t)p[7]<<48 | + (uint64_t)p[8]<<56; + negstart = (uint64_t)1<<63; + negmax = UINT64_MAX; + } else if (LP_ENCODING_IS_12BIT_STR(p[0])) { + *count = LP_ENCODING_12BIT_STR_LEN(p); + return p+2; + } else if (LP_ENCODING_IS_32BIT_STR(p[0])) { + *count = LP_ENCODING_32BIT_STR_LEN(p); + return p+5; + } else { + uval = 12345678900000000ULL + p[0]; + negstart = UINT64_MAX; + negmax = 0; + } + + /* We reach this code path only for integer encodings. + * Convert the unsigned value to the signed one using two's complement + * rule. */ + if (uval >= negstart) { + /* This three steps conversion should avoid undefined behaviors + * in the unsigned -> signed conversion. */ + uval = negmax-uval; + val = uval; + val = -val-1; + } else { + val = uval; + } + + /* Return the string representation of the integer or the value itself + * depending on intbuf being NULL or not. */ + if (intbuf) { + *count = snprintf((char*)intbuf,LP_INTBUF_SIZE,"%lld",val); + return intbuf; + } else { + *count = val; + return NULL; + } +} + +/* Insert, delete or replace the specified element 'ele' of lenght 'len' at + * the specified position 'p', with 'p' being a listpack element pointer + * obtained with lpFirst(), lpLast(), lpIndex(), lpNext(), lpPrev() or + * lpSeek(). + * + * The element is inserted before, after, or replaces the element pointed + * by 'p' depending on the 'where' argument, that can be LP_BEFORE, LP_AFTER + * or LP_REPLACE. + * + * If 'ele' is set to NULL, the function removes the element pointed by 'p' + * instead of inserting one. + * + * Returns NULL on out of memory or when the listpack total length would exceed + * the max allowed size of 2^32-1, otherwise the new pointer to the listpack + * holding the new element is returned (and the old pointer passed is no longer + * considered valid) + * + * If 'newp' is not NULL, at the end of a successful call '*newp' will be set + * to the address of the element just added, so that it will be possible to + * continue an interation with lpNext() and lpPrev(). + * + * For deletion operations ('ele' set to NULL) 'newp' is set to the next + * element, on the right of the deleted one, or to NULL if the deleted element + * was the last one. */ +unsigned char *lpInsert(unsigned char *lp, unsigned char *ele, uint32_t size, unsigned char *p, int where, unsigned char **newp) { + unsigned char intenc[LP_MAX_INT_ENCODING_LEN]; + unsigned char backlen[LP_MAX_BACKLEN_SIZE]; + + uint64_t enclen; /* The length of the encoded element. */ + + /* An element pointer set to NULL means deletion, which is conceptually + * replacing the element with a zero-length element. So whatever we + * get passed as 'where', set it to LP_REPLACE. */ + if (ele == NULL) where = LP_REPLACE; + + /* If we need to insert after the current element, we just jump to the + * next element (that could be the EOF one) and handle the case of + * inserting before. So the function will actually deal with just two + * cases: LP_BEFORE and LP_REPLACE. */ + if (where == LP_AFTER) { + p = lpSkip(p); + where = LP_BEFORE; + } + + /* Store the offset of the element 'p', so that we can obtain its + * address again after a reallocation. */ + unsigned long poff = p-lp; + + /* Calling lpEncodeGetType() results into the encoded version of the + * element to be stored into 'intenc' in case it is representable as + * an integer: in that case, the function returns LP_ENCODING_INT. + * Otherwise if LP_ENCODING_STR is returned, we'll have to call + * lpEncodeString() to actually write the encoded string on place later. + * + * Whatever the returned encoding is, 'enclen' is populated with the + * length of the encoded element. */ + int enctype; + if (ele) { + enctype = lpEncodeGetType(ele,size,intenc,&enclen); + } else { + enctype = -1; + enclen = 0; + } + + /* We need to also encode the backward-parsable length of the element + * and append it to the end: this allows to traverse the listpack from + * the end to the start. */ + unsigned long backlen_size = ele ? lpEncodeBacklen(backlen,enclen) : 0; + uint64_t old_listpack_bytes = lpGetTotalBytes(lp); + uint32_t replaced_len = 0; + if (where == LP_REPLACE) { + replaced_len = lpCurrentEncodedSize(p); + replaced_len += lpEncodeBacklen(NULL,replaced_len); + } + + uint64_t new_listpack_bytes = old_listpack_bytes + enclen + backlen_size + - replaced_len; + if (new_listpack_bytes > UINT32_MAX) return NULL; + + /* We now need to reallocate in order to make space or shrink the + * allocation (in case 'when' value is LP_REPLACE and the new element is + * smaller). However we do that before memmoving the memory to + * make room for the new element if the final allocation will get + * larger, or we do it after if the final allocation will get smaller. */ + + unsigned char *dst = lp + poff; /* May be updated after reallocation. */ + + /* Realloc before: we need more room. */ + if (new_listpack_bytes > old_listpack_bytes) { + if ((lp = lp_realloc(lp,new_listpack_bytes)) == NULL) return NULL; + dst = lp + poff; + } + + /* Setup the listpack relocating the elements to make the exact room + * we need to store the new one. */ + if (where == LP_BEFORE) { + memmove(dst+enclen+backlen_size,dst,old_listpack_bytes-poff); + } else { /* LP_REPLACE. */ + long lendiff = (enclen+backlen_size)-replaced_len; + memmove(dst+replaced_len+lendiff, + dst+replaced_len, + old_listpack_bytes-poff-replaced_len); + } + + /* Realloc after: we need to free space. */ + if (new_listpack_bytes < old_listpack_bytes) { + if ((lp = lp_realloc(lp,new_listpack_bytes)) == NULL) return NULL; + dst = lp + poff; + } + + /* Store the entry. */ + if (newp) { + *newp = dst; + /* In case of deletion, set 'newp' to NULL if the next element is + * the EOF element. */ + if (!ele && dst[0] == LP_EOF) *newp = NULL; + } + if (ele) { + if (enctype == LP_ENCODING_INT) { + memcpy(dst,intenc,enclen); + } else { + lpEncodeString(dst,ele,size); + } + dst += enclen; + memcpy(dst,backlen,backlen_size); + dst += backlen_size; + } + + /* Update header. */ + if (where != LP_REPLACE || ele == NULL) { + uint32_t num_elements = lpGetNumElements(lp); + if (num_elements != LP_HDR_NUMELE_UNKNOWN) { + if (ele) + lpSetNumElements(lp,num_elements+1); + else + lpSetNumElements(lp,num_elements-1); + } + } + lpSetTotalBytes(lp,new_listpack_bytes); + return lp; +} + +/* Append the specified element 'ele' of lenght 'len' at the end of the + * listpack. It is implemented in terms of lpInsert(), so the return value is + * the same as lpInsert(). */ +unsigned char *lpAppend(unsigned char *lp, unsigned char *ele, uint32_t size) { + uint64_t listpack_bytes = lpGetTotalBytes(lp); + unsigned char *eofptr = lp + listpack_bytes - 1; + return lpInsert(lp,ele,size,eofptr,LP_BEFORE,NULL); +} + +/* Remove the element pointed by 'p', and return the resulting listpack. + * If 'newp' is not NULL, the next element pointer (to the right of the + * deleted one) is returned by reference. If the deleted element was the + * last one, '*newp' is set to NULL. */ +unsigned char *lpDelete(unsigned char *lp, unsigned char *p, unsigned char **newp) { + return lpInsert(lp,NULL,0,p,LP_REPLACE,newp); +} + +/* Return the total number of bytes the listpack is composed of. */ +uint32_t lpBytes(unsigned char *lp) { + return lpGetTotalBytes(lp); +} + +/* Seek the specified element and returns the pointer to the seeked element. + * Positive indexes specify the zero-based element to seek from the head to + * the tail, negative indexes specify elements starting from the tail, where + * -1 means the last element, -2 the penultimate and so forth. If the index + * is out of range, NULL is returned. */ +unsigned char *lpSeek(unsigned char *lp, long index) { + int forward = 1; /* Seek forward by default. */ + + /* We want to seek from left to right or the other way around + * depending on the listpack length and the element position. + * However if the listpack length cannot be obtained in constant time, + * we always seek from left to right. */ + uint32_t numele = lpGetNumElements(lp); + if (numele != LP_HDR_NUMELE_UNKNOWN) { + if (index < 0) index = (long)numele+index; + if (index < 0) return NULL; /* Index still < 0 means out of range. */ + if (index >= numele) return NULL; /* Out of range the other side. */ + /* We want to scan right-to-left if the element we are looking for + * is past the half of the listpack. */ + if (index > numele/2) { + forward = 0; + /* Left to right scanning always expects a negative index. Convert + * our index to negative form. */ + index -= numele; + } + } else { + /* If the listpack length is unspecified, for negative indexes we + * want to always scan left-to-right. */ + if (index < 0) forward = 0; + } + + /* Forward and backward scanning is trivially based on lpNext()/lpPrev(). */ + if (forward) { + unsigned char *ele = lpFirst(lp); + while (index > 0 && ele) { + ele = lpNext(lp,ele); + index--; + } + return ele; + } else { + unsigned char *ele = lpLast(lp); + while (index < -1 && ele) { + ele = lpPrev(lp,ele); + index++; + } + return ele; + } +} + diff --git a/src/listpack.h b/src/listpack.h new file mode 100644 index 00000000..af67b4b4 --- /dev/null +++ b/src/listpack.h @@ -0,0 +1,61 @@ +/* Listpack -- A lists of strings serialization format + * + * This file implements the specification you can find at: + * + * https://github.com/antirez/listpack + * + * Copyright (c) 2017, Salvatore Sanfilippo + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of Redis nor the names of its contributors may be used + * to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef __LISTPACK_H +#define __LISTPACK_H + +#include + +#define LP_INTBUF_SIZE 21 /* 20 digits of -2^63 + 1 null term = 21. */ + +/* lpInsert() where argument possible values: */ +#define LP_BEFORE 0 +#define LP_AFTER 1 +#define LP_REPLACE 2 + +unsigned char *lpNew(void); +void lpFree(unsigned char *lp); +unsigned char *lpInsert(unsigned char *lp, unsigned char *ele, uint32_t size, unsigned char *p, int where, unsigned char **newp); +unsigned char *lpAppend(unsigned char *lp, unsigned char *ele, uint32_t size); +unsigned char *lpDelete(unsigned char *lp, unsigned char *p, unsigned char **newp); +uint32_t lpLength(unsigned char *lp); +unsigned char *lpGet(unsigned char *p, int64_t *count, unsigned char *intbuf); +unsigned char *lpFirst(unsigned char *lp); +unsigned char *lpLast(unsigned char *lp); +unsigned char *lpNext(unsigned char *lp, unsigned char *p); +unsigned char *lpPrev(unsigned char *lp, unsigned char *p); +uint32_t lpBytes(unsigned char *lp); +unsigned char *lpSeek(unsigned char *lp, long index); + +#endif diff --git a/src/listpack_malloc.h b/src/listpack_malloc.h new file mode 100644 index 00000000..a3a077fc --- /dev/null +++ b/src/listpack_malloc.h @@ -0,0 +1,44 @@ +/* Listpack -- A lists of strings serialization format + * https://github.com/antirez/listpack + * + * Copyright (c) 2017, Salvatore Sanfilippo + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of Redis nor the names of its contributors may be used + * to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +/* Allocator selection. + * + * This file is used in order to change the Rax allocator at compile time. + * Just define the following defines to what you want to use. Also add + * the include of your alternate allocator if needed (not needed in order + * to use the default libc allocator). */ + +#ifndef LISTPACK_ALLOC_H +#define LISTPACK_ALLOC_H +#define lp_malloc malloc +#define lp_realloc realloc +#define lp_free free +#endif diff --git a/src/object.c b/src/object.c index d2f8d53c..8eeb5c6c 100644 --- a/src/object.c +++ b/src/object.c @@ -232,6 +232,13 @@ robj *createZsetZiplistObject(void) { return o; } +robj *createStreamObject(void) { + stream *s = streamNew(); + robj *o = createObject(OBJ_STREAM,s); + o->encoding = OBJ_ENCODING_STREAM; + return o; +} + robj *createModuleObject(moduleType *mt, void *value) { moduleValue *mv = zmalloc(sizeof(*mv)); mv->type = mt; diff --git a/src/rax.c b/src/rax.c index dda008df..b4f5ae05 100644 --- a/src/rax.c +++ b/src/rax.c @@ -131,7 +131,7 @@ static inline void raxStackFree(raxStack *ts) { } /* ---------------------------------------------------------------------------- - * Radis tree implementation + * Radix tree implementation * --------------------------------------------------------------------------*/ /* Allocate a new non compressed node with the specified number of children. @@ -873,7 +873,8 @@ raxNode *raxRemoveChild(raxNode *parent, raxNode *child) { memmove(((char*)cp)-1,cp,(parent->size-taillen-1)*sizeof(raxNode**)); /* Move the remaining "tail" pointer at the right position as well. */ - memmove(((char*)c)-1,c+1,taillen*sizeof(raxNode**)+parent->iskey*sizeof(void*)); + size_t valuelen = (parent->iskey && !parent->isnull) ? sizeof(void*) : 0; + memmove(((char*)c)-1,c+1,taillen*sizeof(raxNode**)+valuelen); /* 4. Update size. */ parent->size--; @@ -1175,7 +1176,7 @@ void raxIteratorDelChars(raxIterator *it, size_t count) { * The function returns 1 on success or 0 on out of memory. */ int raxIteratorNextStep(raxIterator *it, int noup) { if (it->flags & RAX_ITER_EOF) { - return 0; + return 1; } else if (it->flags & RAX_ITER_JUST_SEEKED) { it->flags &= ~RAX_ITER_JUST_SEEKED; return 1; @@ -1187,10 +1188,6 @@ int raxIteratorNextStep(raxIterator *it, int noup) { size_t orig_stack_items = it->stack.items; raxNode *orig_node = it->node; - /* Clear the EOF flag: it will be set again if the EOF condition - * is still valid. */ - it->flags &= ~RAX_ITER_EOF; - while(1) { int children = it->node->iscompr ? 1 : it->node->size; if (!noup && children) { @@ -1291,7 +1288,7 @@ int raxSeekGreatest(raxIterator *it) { * effect to the one of raxIteratorPrevSte(). */ int raxIteratorPrevStep(raxIterator *it, int noup) { if (it->flags & RAX_ITER_EOF) { - return 0; + return 1; } else if (it->flags & RAX_ITER_JUST_SEEKED) { it->flags &= ~RAX_ITER_JUST_SEEKED; return 1; @@ -1412,6 +1409,7 @@ int raxSeek(raxIterator *it, const char *op, unsigned char *ele, size_t len) { it->node = it->rt->head; if (!raxSeekGreatest(it)) return 0; assert(it->node->iskey); + it->data = raxGetData(it->node); return 1; } @@ -1430,6 +1428,7 @@ int raxSeek(raxIterator *it, const char *op, unsigned char *ele, size_t len) { /* We found our node, since the key matches and we have an * "equal" condition. */ if (!raxIteratorAddChars(it,ele,len)) return 0; /* OOM. */ + it->data = raxGetData(it->node); } else if (lt || gt) { /* Exact key not found or eq flag not set. We have to set as current * key the one represented by the node we stopped at, and perform @@ -1502,6 +1501,7 @@ int raxSeek(raxIterator *it, const char *op, unsigned char *ele, size_t len) { * the previous sub-tree. */ if (nodechar < keychar) { if (!raxSeekGreatest(it)) return 0; + it->data = raxGetData(it->node); } else { if (!raxIteratorAddChars(it,it->node->data,it->node->size)) return 0; @@ -1647,6 +1647,14 @@ void raxStop(raxIterator *it) { raxStackFree(&it->stack); } +/* Return if the iterator is in an EOF state. This happens when raxSeek() + * failed to seek an appropriate element, so that raxNext() or raxPrev() + * will return zero, or when an EOF condition was reached while iterating + * with raxNext() and raxPrev(). */ +int raxEOF(raxIterator *it) { + return it->flags & RAX_ITER_EOF; +} + /* ----------------------------- Introspection ------------------------------ */ /* This function is mostly used for debugging and learning purposes. diff --git a/src/rax.h b/src/rax.h index 6f91f4c1..f6985c37 100644 --- a/src/rax.h +++ b/src/rax.h @@ -155,6 +155,7 @@ int raxPrev(raxIterator *it); int raxRandomWalk(raxIterator *it, size_t steps); int raxCompare(raxIterator *iter, const char *op, unsigned char *key, size_t key_len); void raxStop(raxIterator *it); +int raxEOF(raxIterator *it); void raxShow(rax *rax); #endif diff --git a/src/rdb.h b/src/rdb.h index 62a13f44..bf115045 100644 --- a/src/rdb.h +++ b/src/rdb.h @@ -89,10 +89,11 @@ #define RDB_TYPE_ZSET_ZIPLIST 12 #define RDB_TYPE_HASH_ZIPLIST 13 #define RDB_TYPE_LIST_QUICKLIST 14 +#define RDB_TYPE_STREAM_LISTPACKS 15 /* NOTE: WHEN ADDING NEW RDB TYPE, UPDATE rdbIsObjectType() BELOW */ /* Test if a type is an object type. */ -#define rdbIsObjectType(t) ((t >= 0 && t <= 7) || (t >= 9 && t <= 14)) +#define rdbIsObjectType(t) ((t >= 0 && t <= 7) || (t >= 9 && t <= 15)) /* Special RDB opcodes (saved/loaded with rdbSaveType/rdbLoadType). */ #define RDB_OPCODE_AUX 250 diff --git a/src/server.c b/src/server.c index 7498a25f..2c3647db 100644 --- a/src/server.c +++ b/src/server.c @@ -302,6 +302,8 @@ struct redisCommand redisCommandTable[] = { {"pfcount",pfcountCommand,-2,"r",0,NULL,1,-1,1,0,0}, {"pfmerge",pfmergeCommand,-2,"wm",0,NULL,1,-1,1,0,0}, {"pfdebug",pfdebugCommand,-3,"w",0,NULL,0,0,0,0,0}, + {"xadd",xaddCommand,-4,"wmF",0,NULL,1,1,1,0,0}, + {"xrange",xrangeCommand,-4,"r",0,NULL,1,1,1,0,0}, {"post",securityWarningCommand,-1,"lt",0,NULL,0,0,0,0,0}, {"host:",securityWarningCommand,-1,"lt",0,NULL,0,0,0,0,0}, {"latency",latencyCommand,-2,"aslt",0,NULL,0,0,0,0,0} diff --git a/src/server.h b/src/server.h index 11eb36f3..38a76d00 100644 --- a/src/server.h +++ b/src/server.h @@ -59,6 +59,7 @@ typedef long long mstime_t; /* millisecond time type. */ #include "anet.h" /* Networking the easy way */ #include "ziplist.h" /* Compact list data structure */ #include "intset.h" /* Compact integer set structure */ +#include "stream.h" /* Stream data type header file. */ #include "version.h" /* Version macro */ #include "util.h" /* Misc functions useful in many places */ #include "latency.h" /* Latency monitor API */ @@ -451,6 +452,7 @@ typedef long long mstime_t; /* millisecond time type. */ #define OBJ_SET 2 #define OBJ_ZSET 3 #define OBJ_HASH 4 +#define OBJ_STREAM 5 /* The "module" object type is a special one that signals that the object * is one directly managed by a Redis module. In this case the value points @@ -575,6 +577,7 @@ typedef struct RedisModuleDigest { #define OBJ_ENCODING_SKIPLIST 7 /* Encoded as skiplist */ #define OBJ_ENCODING_EMBSTR 8 /* Embedded sds string encoding */ #define OBJ_ENCODING_QUICKLIST 9 /* Encoded as linked list of ziplists */ +#define OBJ_ENCODING_STREAM 10 /* Encoded as a radix tree of listpacks */ #define LRU_BITS 24 #define LRU_CLOCK_MAX ((1<lru */ @@ -1414,6 +1417,9 @@ void handleClientsBlockedOnLists(void); void popGenericCommand(client *c, int where); void signalListAsReady(redisDb *db, robj *key); +/* Stream data type. */ +stream *streamNew(void); + /* MULTI/EXEC/WATCH... */ void unwatchAllKeys(client *c); void initClientMultiState(client *c); @@ -1455,6 +1461,7 @@ robj *createIntsetObject(void); robj *createHashObject(void); robj *createZsetObject(void); robj *createZsetZiplistObject(void); +robj *createStreamObject(void); robj *createModuleObject(moduleType *mt, void *value); int getLongFromObjectOrReply(client *c, robj *o, long *target, const char *msg); int checkType(client *c, robj *o, int type); @@ -1992,6 +1999,8 @@ void pfdebugCommand(client *c); void latencyCommand(client *c); void moduleCommand(client *c); void securityWarningCommand(client *c); +void xaddCommand(client *c); +void xrangeCommand(client *c); #if defined(__GNUC__) void *calloc(size_t count, size_t size) __attribute__ ((deprecated)); diff --git a/src/stream.h b/src/stream.h new file mode 100644 index 00000000..065c328e --- /dev/null +++ b/src/stream.h @@ -0,0 +1,21 @@ +#ifndef STREAM_H +#define STREAM_H + +#include "rax.h" + +/* Stream item ID: a 128 bit number composed of a milliseconds time and + * a sequence counter. IDs generated in the same millisecond (or in a past + * millisecond if the clock jumped backward) will use the millisecond time + * of the latest generated ID and an incremented sequence. */ +typedef struct streamID { + uint64_t ms; /* Unix time in milliseconds. */ + uint64_t seq; /* Sequence number. */ +} streamID; + +typedef struct stream { + rax *rax; /* The radix tree holding the stream. */ + uint64_t length; /* Number of elements inside this stream. */ + streamID last_id; /* Zero if there are yet no items. */ +} stream; + +#endif diff --git a/src/t_stream.c b/src/t_stream.c new file mode 100644 index 00000000..c64f5059 --- /dev/null +++ b/src/t_stream.c @@ -0,0 +1,376 @@ +/* + * Copyright (c) 2017, Salvatore Sanfilippo + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of Redis nor the names of its contributors may be used + * to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +/* TODO: + * - After loading a stream, populate the last ID. + */ + +#include "server.h" +#include "listpack.h" +#include "endianconv.h" +#include "stream.h" + +#define STREAM_BYTES_PER_LISTPACK 4096 + +/* ----------------------------------------------------------------------- + * Low level stream encoding: a radix tree of listpacks. + * ----------------------------------------------------------------------- */ + +/* Create a new stream data structure. */ +stream *streamNew(void) { + stream *s = zmalloc(sizeof(*s)); + s->rax = raxNew(); + s->length = 0; + s->last_id.ms = 0; + s->last_id.seq = 0; + return s; +} + +/* Generate the next stream item ID given the previous one. If the current + * milliseconds Unix time is greater than the previous one, just use this + * as time part and start with sequence part of zero. Otherwise we use the + * previous time (and never go backward) and increment the sequence. */ +void streamNextID(streamID *last_id, streamID *new_id) { + uint64_t ms = mstime(); + if (ms > last_id->ms) { + new_id->ms = ms; + new_id->seq = 0; + } else { + new_id->ms = last_id->ms; + new_id->seq = last_id->seq+1; + } +} + +/* This is just a wrapper for lpAppend() to directly use a 64 bit integer + * instead of a string. */ +unsigned char *lpAppendInteger(unsigned char *lp, int64_t value) { + char buf[LONG_STR_SIZE]; + int slen = ll2string(buf,sizeof(buf),value); + return lpAppend(lp,(unsigned char*)buf,slen); +} + +/* This is a wrapper function for lpGet() to directly get an integer value + * from the listpack (that may store numbers as a string), converting + * the string if needed. */ +int64_t lpGetInteger(unsigned char *ele) { + int64_t v; + unsigned char *e = lpGet(ele,&v,NULL); + if (e == NULL) return v; + /* The following code path should never be used for how listpacks work: + * they should always be able to store an int64_t value in integer + * encoded form. However the implementation may change. */ + int retval = string2ll((char*)e,v,&v); + serverAssert(retval != 0); + return v; +} + +/* Convert the specified stream entry ID as a 128 bit big endian number, so + * that the IDs can be sorted lexicographically. */ +void streamEncodeID(void *buf, streamID *id) { + uint64_t e[2]; + e[0] = htonu64(id->ms); + e[1] = htonu64(id->seq); + memcpy(buf,e,sizeof(e)); +} + +/* This is the reverse of streamEncodeID(): the decoded ID will be stored + * in the 'id' structure passed by reference. The buffer 'buf' must point + * to a 128 bit big-endian encoded ID. */ +void streamDecodeID(void *buf, streamID *id) { + uint64_t e[2]; + memcpy(e,buf,sizeof(e)); + id->ms = ntohu64(e[0]); + id->seq = ntohu64(e[1]); +} + +/* Adds a new item into the stream 's' having the specified number of + * field-value pairs as specified in 'numfields' and stored into 'argv'. + * Returns the new entry ID populating the 'added_id' structure. */ +void streamAppendItem(stream *s, robj **argv, int numfields, streamID *added_id) { + raxIterator ri; + raxStart(&ri,s->rax); + raxSeek(&ri,"$",NULL,0); + + size_t lp_bytes = 0; /* Total bytes in the tail listpack. */ + unsigned char *lp = NULL; /* Tail listpack pointer. */ + + /* Get a reference to the tail node listpack. */ + if (raxNext(&ri)) { + lp = ri.data; + lp_bytes = lpBytes(lp); + } + raxStop(&ri); + + /* Generate the new entry ID. */ + streamID id; + streamNextID(&s->last_id,&id); + + /* We have to add the key into the radix tree in lexicographic order, + * to do so we consider the ID as a single 128 bit number written in + * big endian, so that the most significant bytes are the first ones. */ + uint64_t rax_key[2]; /* Key in the radix tree containing the listpack.*/ + uint64_t entry_id[2]; /* Entry ID of the new item as 128 bit string. */ + streamEncodeID(entry_id,&id); + + /* Create a new listpack and radix tree node if needed. */ + if (lp == NULL || lp_bytes > STREAM_BYTES_PER_LISTPACK) { + lp = lpNew(); + rax_key[0] = entry_id[0]; + rax_key[1] = entry_id[1]; + raxInsert(s->rax,(unsigned char*)&rax_key,sizeof(rax_key),lp,NULL); + } else { + serverAssert(ri.key_len == sizeof(rax_key)); + memcpy(rax_key,ri.key,sizeof(rax_key)); + } + + /* Populate the listpack with the new entry. */ + lp = lpAppend(lp,(unsigned char*)entry_id,sizeof(entry_id)); + lp = lpAppendInteger(lp,numfields); + for (int i = 0; i < numfields; i++) { + sds field = argv[i*2]->ptr, value = argv[i*2+1]->ptr; + lp = lpAppend(lp,(unsigned char*)field,sdslen(field)); + lp = lpAppend(lp,(unsigned char*)value,sdslen(value)); + } + + /* Insert back into the tree in order to update the listpack pointer. */ + raxInsert(s->rax,(unsigned char*)&rax_key,sizeof(rax_key),lp,NULL); + s->length++; + s->last_id = id; + if (added_id) *added_id = id; + raxShow(s->rax); +} + +/* Send the specified range to the client 'c'. The range the client will + * receive is between start and end inclusive, if 'count' is non zero, no more + * than 'count' elemnets are sent. The 'end' pointer can be NULL to mean that + * we want all the elements from 'start' till the end of the stream. */ +size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count) { + void *arraylen_ptr = addDeferredMultiBulkLength(c); + size_t arraylen = 0; + + /* Seek the radix tree node that contains our start item. */ + uint64_t key[2]; + uint64_t end_key[2]; + streamEncodeID(key,start); + if (end) streamEncodeID(end_key,end); + raxIterator ri; + raxStart(&ri,s->rax); + + /* Seek the correct node in the radix tree. */ + if (start->ms || start->seq) { + raxSeek(&ri,"<=",(unsigned char*)key,sizeof(key)); + if (raxEOF(&ri)) raxSeek(&ri,">",(unsigned char*)key,sizeof(key)); + } else { + raxSeek(&ri,"^",NULL,0); + } + + /* For every radix tree node, iterate the corresponding listpack, + * returning elmeents when they are within range. */ + while (raxNext(&ri)) { + serverAssert(ri.key_len == sizeof(key)); + unsigned char *lp = ri.data; + unsigned char *lp_ele = lpFirst(lp); + while(lp_ele) { + int64_t e_len; + unsigned char buf[LP_INTBUF_SIZE]; + unsigned char *e = lpGet(lp_ele,&e_len,buf); + serverAssert(e_len == sizeof(streamID)); + + /* Seek next field: number of elements. */ + lp_ele = lpNext(lp,lp_ele); + if (memcmp(e,key,sizeof(key)) >= 0) { /* If current >= start */ + if (end && memcmp(e,end_key,sizeof(key)) > 0) { + break; /* We are already out of range. */ + } + streamID thisid; + streamDecodeID(e,&thisid); + sds replyid = sdscatfmt(sdsempty(),"+%U.%U\r\n", + thisid.ms,thisid.seq); + + /* Emit this stream entry in the client output. */ + addReplyMultiBulkLen(c,2); + addReplySds(c,replyid); + int64_t numfields = lpGetInteger(lp_ele); + lp_ele = lpNext(lp,lp_ele); + addReplyMultiBulkLen(c,numfields*2); + for (int64_t i = 0; i < numfields; i++) { + /* Emit two items (key-value) per iteration. */ + for (int k = 0; k < 2; k++) { + e = lpGet(lp_ele,&e_len,buf); + addReplyBulkCBuffer(c,e,e_len); + lp_ele = lpNext(lp,lp_ele); + } + } + + arraylen++; + if (count && count == arraylen) break; + } else { + /* If we do not emit, we have to discard. */ + int64_t numfields = lpGetInteger(lp_ele); + lp_ele = lpNext(lp,lp_ele); + for (int64_t i = 0; i < numfields*2; i++) + lp_ele = lpNext(lp,lp_ele); + } + } + if (count && count == arraylen) break; + } + raxStop(&ri); + setDeferredMultiBulkLength(c,arraylen_ptr,arraylen); + return arraylen; +} + +/* ----------------------------------------------------------------------- + * Stream commands implementation + * ----------------------------------------------------------------------- */ + +/* Look the stream at 'key' and return the corresponding stream object. + * The function creates a key setting it to an empty stream if needed. */ +robj *streamTypeLookupWriteOrCreate(client *c, robj *key) { + robj *o = lookupKeyWrite(c->db,key); + if (o == NULL) { + o = createStreamObject(); + dbAdd(c->db,key,o); + } else { + if (o->type != OBJ_STREAM) { + addReply(c,shared.wrongtypeerr); + return NULL; + } + } + return o; +} + +/* Helper function to convert a string to an unsigned long long value. + * The function attempts to use the faster string2ll() function inside + * Redis: if it fails, strtoull() is used instead. The function returns + * 1 if the conversion happened successfully or 0 if the number is + * invalid or out of range. */ +int string2ull(const char *s, unsigned long long *value) { + long long ll; + if (string2ll(s,strlen(s),&ll)) { + if (ll < 0) return 0; /* Negative values are out of range. */ + *value = ll; + return 1; + } + errno = 0; + *value = strtoull(s,NULL,10); + if (errno == EINVAL || errno == ERANGE) return 0; /* strtoull() failed. */ + return 1; /* Conversion done! */ +} + +/* Parse a stream ID in the format given by clients to Redis, that is + * ., and converts it into a streamID structure. If + * the specified ID is invalid C_ERR is returned and an error is reported + * to the client, otherwise C_OK is returned. The ID may be in incomplete + * form, just stating the milliseconds time part of the stream. In such a case + * the missing part is set according to the value of 'missing_seq' parameter. + * The IDs "-" and "+" specify respectively the minimum and maximum IDs + * that can be represented. */ +int streamParseIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq) { + char buf[128]; + if (sdslen(o->ptr) > sizeof(buf)-1) goto invalid; + memcpy(buf,o->ptr,sdslen(o->ptr)+1); + + /* Handle the "-" and "+" special cases. */ + if (buf[0] == '-' && buf[1] == '\0') { + id->ms = 0; + id->seq = 0; + return C_OK; + } else if (buf[0] == '+' && buf[1] == '\0') { + id->ms = UINT64_MAX; + id->seq = UINT64_MAX; + return C_OK; + } + + /* Parse . form. */ + char *dot = strchr(buf,'.'); + if (dot) *dot = '\0'; + uint64_t ms, seq; + if (string2ull(buf,&ms) == 0) goto invalid; + if (dot && string2ull(dot+1,&seq) == 0) goto invalid; + if (!dot) seq = missing_seq; + id->ms = ms; + id->seq = seq; + return C_OK; + +invalid: + addReplyError(c,"Invalid stream ID specified as stream command argument"); + return C_ERR; +} + +/* XADD key [field value] [field value] ... */ +void xaddCommand(client *c) { + if ((c->argc % 2) == 1) { + addReplyError(c,"wrong number of arguments for XADD"); + return; + } + + /* Lookup the stream at key. */ + robj *o; + stream *s; + if ((o = streamTypeLookupWriteOrCreate(c,c->argv[1])) == NULL) return; + s = o->ptr; + + /* Append using the low level function and return the ID. */ + streamID id; + streamAppendItem(s,c->argv+2,(c->argc-2)/2,&id); + sds reply = sdscatfmt(sdsempty(),"+%U.%U\r\n",id.ms,id.seq); + addReplySds(c,reply); + + signalModifiedKey(c->db,c->argv[1]); + notifyKeyspaceEvent(NOTIFY_HASH,"xadd",c->argv[1],c->db->id); + server.dirty++; +} + +/* XRANGE key start end [COUNT ] */ +void xrangeCommand(client *c) { + robj *o; + stream *s; + streamID startid, endid; + long long count = 0; + + if (streamParseIDOrReply(c,c->argv[2],&startid,0) == C_ERR) return; + if (streamParseIDOrReply(c,c->argv[3],&endid,UINT64_MAX) == C_ERR) return; + + /* Parse the COUNT option if any. */ + if (c->argc > 4) { + if (strcasecmp(c->argv[4]->ptr,"COUNT") == 0) { + if (getLongLongFromObjectOrReply(c,c->argv[5],&count,NULL) != C_OK) + return; + } else { + addReply(c,shared.syntaxerr); + return; + } + } + + /* Return the specified range to the user. */ + if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptymultibulk)) == NULL + || checkType(c,o,OBJ_STREAM)) return; + s = o->ptr; + streamReplyWithRange(c,s,&startid,&endid,count); +}