mirror of
https://github.com/fluencelabs/redis
synced 2025-03-17 08:00:49 +00:00
Streams: 12 commits squashed into the initial Streams implementation.
This commit is contained in:
parent
045d65c3af
commit
79866a6361
@ -144,7 +144,7 @@ endif
|
||||
|
||||
REDIS_SERVER_NAME=redis-server
|
||||
REDIS_SENTINEL_NAME=redis-sentinel
|
||||
REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o
|
||||
REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.c
|
||||
REDIS_CLI_NAME=redis-cli
|
||||
REDIS_CLI_OBJ=anet.o adlist.o redis-cli.o zmalloc.o release.o anet.o ae.o crc64.o
|
||||
REDIS_BENCHMARK_NAME=redis-benchmark
|
||||
|
783
src/listpack.c
Normal file
783
src/listpack.c
Normal file
@ -0,0 +1,783 @@
|
||||
/* Listpack -- A lists of strings serialization format
|
||||
*
|
||||
* This file implements the specification you can find at:
|
||||
*
|
||||
* https://github.com/antirez/listpack
|
||||
*
|
||||
* Copyright (c) 2017, Salvatore Sanfilippo <antirez at gmail dot com>
|
||||
* All rights reserved.
|
||||
*
|
||||
* Redistribution and use in source and binary forms, with or without
|
||||
* modification, are permitted provided that the following conditions are met:
|
||||
*
|
||||
* * Redistributions of source code must retain the above copyright notice,
|
||||
* this list of conditions and the following disclaimer.
|
||||
* * Redistributions in binary form must reproduce the above copyright
|
||||
* notice, this list of conditions and the following disclaimer in the
|
||||
* documentation and/or other materials provided with the distribution.
|
||||
* * Neither the name of Redis nor the names of its contributors may be used
|
||||
* to endorse or promote products derived from this software without
|
||||
* specific prior written permission.
|
||||
*
|
||||
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
|
||||
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
|
||||
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
|
||||
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
|
||||
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
|
||||
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
|
||||
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
|
||||
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
|
||||
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
|
||||
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
|
||||
* POSSIBILITY OF SUCH DAMAGE.
|
||||
*/
|
||||
|
||||
#include <stdint.h>
|
||||
#include <limits.h>
|
||||
#include <sys/types.h>
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
#include <stdio.h>
|
||||
|
||||
#include "listpack.h"
|
||||
#include "listpack_malloc.h"
|
||||
|
||||
#define LP_HDR_SIZE 6 /* 32 bit total len + 16 bit number of elements. */
|
||||
#define LP_HDR_NUMELE_UNKNOWN UINT16_MAX
|
||||
#define LP_MAX_INT_ENCODING_LEN 9
|
||||
#define LP_MAX_BACKLEN_SIZE 5
|
||||
#define LP_MAX_ENTRY_BACKLEN 34359738367ULL
|
||||
#define LP_ENCODING_INT 0
|
||||
#define LP_ENCODING_STRING 1
|
||||
|
||||
#define LP_ENCODING_7BIT_UINT 0
|
||||
#define LP_ENCODING_7BIT_UINT_MASK 0x80
|
||||
#define LP_ENCODING_IS_7BIT_UINT(byte) (((byte)&LP_ENCODING_7BIT_UINT_MASK)==LP_ENCODING_7BIT_UINT)
|
||||
|
||||
#define LP_ENCODING_6BIT_STR 0x80
|
||||
#define LP_ENCODING_6BIT_STR_MASK 0xC0
|
||||
#define LP_ENCODING_IS_6BIT_STR(byte) (((byte)&LP_ENCODING_6BIT_STR_MASK)==LP_ENCODING_6BIT_STR)
|
||||
|
||||
#define LP_ENCODING_13BIT_INT 0xC0
|
||||
#define LP_ENCODING_13BIT_INT_MASK 0xE0
|
||||
#define LP_ENCODING_IS_13BIT_INT(byte) (((byte)&LP_ENCODING_13BIT_INT_MASK)==LP_ENCODING_13BIT_INT)
|
||||
|
||||
#define LP_ENCODING_12BIT_STR 0xE0
|
||||
#define LP_ENCODING_12BIT_STR_MASK 0xF0
|
||||
#define LP_ENCODING_IS_12BIT_STR(byte) (((byte)&LP_ENCODING_12BIT_STR_MASK)==LP_ENCODING_12BIT_STR)
|
||||
|
||||
#define LP_ENCODING_16BIT_INT 0xF1
|
||||
#define LP_ENCODING_16BIT_INT_MASK 0xFF
|
||||
#define LP_ENCODING_IS_16BIT_INT(byte) (((byte)&LP_ENCODING_16BIT_INT_MASK)==LP_ENCODING_16BIT_INT)
|
||||
|
||||
#define LP_ENCODING_24BIT_INT 0xF2
|
||||
#define LP_ENCODING_24BIT_INT_MASK 0xFF
|
||||
#define LP_ENCODING_IS_24BIT_INT(byte) (((byte)&LP_ENCODING_24BIT_INT_MASK)==LP_ENCODING_24BIT_INT)
|
||||
|
||||
#define LP_ENCODING_32BIT_INT 0xF3
|
||||
#define LP_ENCODING_32BIT_INT_MASK 0xFF
|
||||
#define LP_ENCODING_IS_32BIT_INT(byte) (((byte)&LP_ENCODING_32BIT_INT_MASK)==LP_ENCODING_32BIT_INT)
|
||||
|
||||
#define LP_ENCODING_64BIT_INT 0xF4
|
||||
#define LP_ENCODING_64BIT_INT_MASK 0xFF
|
||||
#define LP_ENCODING_IS_64BIT_INT(byte) (((byte)&LP_ENCODING_64BIT_INT_MASK)==LP_ENCODING_64BIT_INT)
|
||||
|
||||
#define LP_ENCODING_32BIT_STR 0xF0
|
||||
#define LP_ENCODING_32BIT_STR_MASK 0xFF
|
||||
#define LP_ENCODING_IS_32BIT_STR(byte) (((byte)&LP_ENCODING_32BIT_STR_MASK)==LP_ENCODING_32BIT_STR)
|
||||
|
||||
#define LP_EOF 0xFF
|
||||
|
||||
#define LP_ENCODING_6BIT_STR_LEN(p) ((p)[0] & 0x3F)
|
||||
#define LP_ENCODING_12BIT_STR_LEN(p) ((((p)[0] & 0xF) << 8) | (p)[1])
|
||||
#define LP_ENCODING_32BIT_STR_LEN(p) (((uint32_t)(p)[1]<<0) | \
|
||||
((uint32_t)(p)[2]<<8) | \
|
||||
((uint32_t)(p)[3]<<16) | \
|
||||
((uint32_t)(p)[4]<<24))
|
||||
|
||||
#define lpGetTotalBytes(p) (((uint32_t)(p)[0]<<0) | \
|
||||
((uint32_t)(p)[1]<<8) | \
|
||||
((uint32_t)(p)[2]<<16) | \
|
||||
((uint32_t)(p)[3]<<24))
|
||||
|
||||
#define lpGetNumElements(p) (((uint32_t)(p)[4]<<0) | \
|
||||
((uint32_t)(p)[5]<<8))
|
||||
#define lpSetTotalBytes(p,v) do { \
|
||||
(p)[0] = (v)&0xff; \
|
||||
(p)[1] = ((v)>>8)&0xff; \
|
||||
(p)[2] = ((v)>>16)&0xff; \
|
||||
(p)[3] = ((v)>>24)&0xff; \
|
||||
} while(0)
|
||||
|
||||
#define lpSetNumElements(p,v) do { \
|
||||
(p)[4] = (v)&0xff; \
|
||||
(p)[5] = ((v)>>8)&0xff; \
|
||||
} while(0)
|
||||
|
||||
/* Convert a string into a signed 64 bit integer.
|
||||
* The function returns 1 if the string could be parsed into a (non-overflowing)
|
||||
* signed 64 bit int, 0 otherwise. The 'value' will be set to the parsed value
|
||||
* when the function returns success.
|
||||
*
|
||||
* Note that this function demands that the string strictly represents
|
||||
* a int64 value: no spaces or other characters before or after the string
|
||||
* representing the number are accepted, nor zeroes at the start if not
|
||||
* for the string "0" representing the zero number.
|
||||
*
|
||||
* Because of its strictness, it is safe to use this function to check if
|
||||
* you can convert a string into a long long, and obtain back the string
|
||||
* from the number without any loss in the string representation. *
|
||||
*
|
||||
* -----------------------------------------------------------------------------
|
||||
*
|
||||
* Credits: this function was adapted from the Redis source code, file
|
||||
* "utils.c", function string2ll(), and is copyright:
|
||||
*
|
||||
* Copyright(C) 2011, Pieter Noordhuis
|
||||
* Copyright(C) 2011, Salvatore Sanfilippo
|
||||
*
|
||||
* The function is released under the BSD 3-clause license.
|
||||
*/
|
||||
int lpStringToInt64(const char *s, unsigned long slen, int64_t *value) {
|
||||
const char *p = s;
|
||||
unsigned long plen = 0;
|
||||
int negative = 0;
|
||||
uint64_t v;
|
||||
|
||||
if (plen == slen)
|
||||
return 0;
|
||||
|
||||
/* Special case: first and only digit is 0. */
|
||||
if (slen == 1 && p[0] == '0') {
|
||||
if (value != NULL) *value = 0;
|
||||
return 1;
|
||||
}
|
||||
|
||||
if (p[0] == '-') {
|
||||
negative = 1;
|
||||
p++; plen++;
|
||||
|
||||
/* Abort on only a negative sign. */
|
||||
if (plen == slen)
|
||||
return 0;
|
||||
}
|
||||
|
||||
/* First digit should be 1-9, otherwise the string should just be 0. */
|
||||
if (p[0] >= '1' && p[0] <= '9') {
|
||||
v = p[0]-'0';
|
||||
p++; plen++;
|
||||
} else if (p[0] == '0' && slen == 1) {
|
||||
*value = 0;
|
||||
return 1;
|
||||
} else {
|
||||
return 0;
|
||||
}
|
||||
|
||||
while (plen < slen && p[0] >= '0' && p[0] <= '9') {
|
||||
if (v > (UINT64_MAX / 10)) /* Overflow. */
|
||||
return 0;
|
||||
v *= 10;
|
||||
|
||||
if (v > (UINT64_MAX - (p[0]-'0'))) /* Overflow. */
|
||||
return 0;
|
||||
v += p[0]-'0';
|
||||
|
||||
p++; plen++;
|
||||
}
|
||||
|
||||
/* Return if not all bytes were used. */
|
||||
if (plen < slen)
|
||||
return 0;
|
||||
|
||||
if (negative) {
|
||||
if (v > ((uint64_t)(-(INT64_MIN+1))+1)) /* Overflow. */
|
||||
return 0;
|
||||
if (value != NULL) *value = -v;
|
||||
} else {
|
||||
if (v > INT64_MAX) /* Overflow. */
|
||||
return 0;
|
||||
if (value != NULL) *value = v;
|
||||
}
|
||||
return 1;
|
||||
}
|
||||
|
||||
/* Create a new, empty listpack.
|
||||
* On success the new listpack is returned, otherwise an error is returned. */
|
||||
unsigned char *lpNew(void) {
|
||||
unsigned char *lp = lp_malloc(LP_HDR_SIZE+1);
|
||||
if (lp == NULL) return NULL;
|
||||
lpSetTotalBytes(lp,LP_HDR_SIZE+1);
|
||||
lpSetNumElements(lp,0);
|
||||
lp[LP_HDR_SIZE] = LP_EOF;
|
||||
return lp;
|
||||
}
|
||||
|
||||
/* Free the specified listpack. */
|
||||
void lpFree(unsigned char *lp) {
|
||||
lp_free(lp);
|
||||
}
|
||||
|
||||
/* Given an element 'ele' of size 'size', determine if the element can be
|
||||
* represented inside the listpack encoded as integer, and returns
|
||||
* LP_ENCODING_INT if so. Otherwise returns LP_ENCODING_STR if no integer
|
||||
* encoding is possible.
|
||||
*
|
||||
* If the LP_ENCODING_INT is returned, the function stores the integer encoded
|
||||
* representation of the element in the 'intenc' buffer.
|
||||
*
|
||||
* Regardless of the returned encoding, 'enclen' is populated by reference to
|
||||
* the number of bytes that the string or integer encoded element will require
|
||||
* in order to be represented. */
|
||||
int lpEncodeGetType(unsigned char *ele, uint32_t size, unsigned char *intenc, uint64_t *enclen) {
|
||||
int64_t v;
|
||||
if (lpStringToInt64((const char*)ele, size, &v)) {
|
||||
if (v >= 0 && v <= 127) {
|
||||
/* Single byte 0-127 integer. */
|
||||
intenc[0] = v;
|
||||
*enclen = 1;
|
||||
} else if (v >= -4096 && v <= 4095) {
|
||||
/* 13 bit integer. */
|
||||
if (v < 0) v = ((int64_t)1<<13)+v;
|
||||
intenc[0] = (v>>8)|LP_ENCODING_13BIT_INT;
|
||||
intenc[1] = v&0xff;
|
||||
*enclen = 2;
|
||||
} else if (v >= -32768 && v <= 32767) {
|
||||
/* 16 bit integer. */
|
||||
if (v < 0) v = ((int64_t)1<<16)+v;
|
||||
intenc[0] = LP_ENCODING_16BIT_INT;
|
||||
intenc[1] = v&0xff;
|
||||
intenc[2] = v>>8;
|
||||
*enclen = 3;
|
||||
} else if (v >= -8388608 && v <= 8388607) {
|
||||
/* 24 bit integer. */
|
||||
if (v < 0) v = ((int64_t)1<<24)+v;
|
||||
intenc[0] = LP_ENCODING_24BIT_INT;
|
||||
intenc[1] = v&0xff;
|
||||
intenc[2] = (v>>8)&0xff;
|
||||
intenc[3] = v>>16;
|
||||
*enclen = 4;
|
||||
} else if (v >= -2147483648 && v <= 2147483647) {
|
||||
/* 32 bit integer. */
|
||||
if (v < 0) v = ((int64_t)1<<32)+v;
|
||||
intenc[0] = LP_ENCODING_32BIT_INT;
|
||||
intenc[1] = v&0xff;
|
||||
intenc[2] = (v>>8)&0xff;
|
||||
intenc[3] = (v>>16)&0xff;
|
||||
intenc[4] = v>>24;
|
||||
*enclen = 5;
|
||||
} else {
|
||||
/* 64 bit integer. */
|
||||
uint64_t uv = v;
|
||||
intenc[0] = LP_ENCODING_64BIT_INT;
|
||||
intenc[1] = uv&0xff;
|
||||
intenc[2] = (uv>>8)&0xff;
|
||||
intenc[3] = (uv>>16)&0xff;
|
||||
intenc[4] = (uv>>24)&0xff;
|
||||
intenc[5] = (uv>>32)&0xff;
|
||||
intenc[6] = (uv>>40)&0xff;
|
||||
intenc[7] = (uv>>48)&0xff;
|
||||
intenc[8] = uv>>56;
|
||||
*enclen = 9;
|
||||
}
|
||||
return LP_ENCODING_INT;
|
||||
} else {
|
||||
if (size < 64) *enclen = 1+size;
|
||||
else if (size < 4096) *enclen = 2+size;
|
||||
else *enclen = 4+size;
|
||||
return LP_ENCODING_STRING;
|
||||
}
|
||||
}
|
||||
|
||||
/* Store a reverse-encoded variable length field, representing the length
|
||||
* of the previous element of size 'l', in the target buffer 'buf'.
|
||||
* The function returns the number of bytes used to encode it, from
|
||||
* 1 to 5. If 'buf' is NULL the funciton just returns the number of bytes
|
||||
* needed in order to encode the backlen. */
|
||||
unsigned long lpEncodeBacklen(unsigned char *buf, uint64_t l) {
|
||||
if (l <= 127) {
|
||||
if (buf) buf[0] = l;
|
||||
return 1;
|
||||
} else if (l < 16383) {
|
||||
if (buf) {
|
||||
buf[0] = l>>7;
|
||||
buf[1] = (l&127)|128;
|
||||
}
|
||||
return 2;
|
||||
} else if (l < 2097151) {
|
||||
if (buf) {
|
||||
buf[0] = l>>14;
|
||||
buf[1] = ((l>>7)&127)|128;
|
||||
buf[2] = (l&127)|128;
|
||||
}
|
||||
return 3;
|
||||
} else if (l < 268435455) {
|
||||
if (buf) {
|
||||
buf[0] = l>>21;
|
||||
buf[1] = ((l>>14)&127)|128;
|
||||
buf[2] = ((l>>7)&127)|128;
|
||||
buf[3] = (l&127)|128;
|
||||
}
|
||||
return 4;
|
||||
} else {
|
||||
if (buf) {
|
||||
buf[0] = l>>28;
|
||||
buf[1] = ((l>>21)&127)|128;
|
||||
buf[2] = ((l>>14)&127)|128;
|
||||
buf[3] = ((l>>7)&127)|128;
|
||||
buf[4] = (l&127)|128;
|
||||
}
|
||||
return 5;
|
||||
}
|
||||
}
|
||||
|
||||
/* Decode the backlen and returns it. If the encoding looks invalid (more than
|
||||
* 5 bytes are used), UINT64_MAX is returned to report the problem. */
|
||||
uint64_t lpDecodeBacklen(unsigned char *p) {
|
||||
uint64_t val = 0;
|
||||
uint64_t shift = 0;
|
||||
do {
|
||||
val |= (uint64_t)(p[0] & 127) << shift;
|
||||
if (!(p[0] & 128)) break;
|
||||
shift += 7;
|
||||
p--;
|
||||
if (shift > 28) return UINT64_MAX;
|
||||
} while(1);
|
||||
return val;
|
||||
}
|
||||
|
||||
/* Encode the string element pointed by 's' of size 'len' in the target
|
||||
* buffer 's'. The function should be called with 'buf' having always enough
|
||||
* space for encoding the string. This is done by calling lpEncodeGetType()
|
||||
* before calling this function. */
|
||||
void lpEncodeString(unsigned char *buf, unsigned char *s, uint32_t len) {
|
||||
if (len < 64) {
|
||||
buf[0] = len | LP_ENCODING_6BIT_STR;
|
||||
memcpy(buf+1,s,len);
|
||||
} else if (len < 4096) {
|
||||
buf[0] = (len >> 8) | LP_ENCODING_12BIT_STR;
|
||||
buf[1] = len & 0xff;
|
||||
memcpy(buf+2,s,len);
|
||||
} else {
|
||||
buf[0] = LP_ENCODING_32BIT_STR;
|
||||
buf[1] = len & 0xff;
|
||||
buf[2] = (len >> 8) & 0xff;
|
||||
buf[3] = (len >> 16) & 0xff;
|
||||
buf[4] = (len >> 24) & 0xff;
|
||||
memcpy(buf+4,s,len);
|
||||
}
|
||||
}
|
||||
|
||||
/* Return the encoded length of the listpack element pointed by 'p'. If the
|
||||
* element encoding is wrong then 0 is returned. */
|
||||
uint32_t lpCurrentEncodedSize(unsigned char *p) {
|
||||
if (LP_ENCODING_IS_7BIT_UINT(p[0])) return 1;
|
||||
if (LP_ENCODING_IS_6BIT_STR(p[0])) return 1+LP_ENCODING_6BIT_STR_LEN(p);
|
||||
if (LP_ENCODING_IS_13BIT_INT(p[0])) return 2;
|
||||
if (LP_ENCODING_IS_16BIT_INT(p[0])) return 3;
|
||||
if (LP_ENCODING_IS_24BIT_INT(p[0])) return 4;
|
||||
if (LP_ENCODING_IS_32BIT_INT(p[0])) return 5;
|
||||
if (LP_ENCODING_IS_64BIT_INT(p[0])) return 9;
|
||||
if (LP_ENCODING_IS_12BIT_STR(p[0])) return 2+LP_ENCODING_12BIT_STR_LEN(p);
|
||||
if (LP_ENCODING_IS_32BIT_STR(p[0])) return 5+LP_ENCODING_32BIT_STR_LEN(p);
|
||||
if (p[0] == LP_EOF) return 1;
|
||||
return 0;
|
||||
}
|
||||
|
||||
/* Skip the current entry returning the next. It is invalid to call this
|
||||
* function if the current element is the EOF element at the end of the
|
||||
* listpack, however, while this function is used to implement lpNext(),
|
||||
* it does not return NULL when the EOF element is encountered. */
|
||||
unsigned char *lpSkip(unsigned char *p) {
|
||||
unsigned long entrylen = lpCurrentEncodedSize(p);
|
||||
entrylen += lpEncodeBacklen(NULL,entrylen);
|
||||
p += entrylen;
|
||||
return p;
|
||||
}
|
||||
|
||||
/* If 'p' points to an element of the listpack, calling lpNext() will return
|
||||
* the pointer to the next element (the one on the right), or NULL if 'p'
|
||||
* already pointed to the last element of the listpack. */
|
||||
unsigned char *lpNext(unsigned char *lp, unsigned char *p) {
|
||||
((void) lp); /* lp is not used for now. However lpPrev() uses it. */
|
||||
p = lpSkip(p);
|
||||
if (p[0] == LP_EOF) return NULL;
|
||||
return p;
|
||||
}
|
||||
|
||||
/* If 'p' points to an element of the listpack, calling lpPrev() will return
|
||||
* the pointer to the preivous element (the one on the left), or NULL if 'p'
|
||||
* already pointed to the first element of the listpack. */
|
||||
unsigned char *lpPrev(unsigned char *lp, unsigned char *p) {
|
||||
if (p-lp == LP_HDR_SIZE) return NULL;
|
||||
p--; /* Seek the first backlen byte of the last element. */
|
||||
uint64_t prevlen = lpDecodeBacklen(p);
|
||||
prevlen += lpEncodeBacklen(NULL,prevlen);
|
||||
return p-prevlen+1; /* Seek the first byte of the previous entry. */
|
||||
}
|
||||
|
||||
/* Return a pointer to the first element of the listpack, or NULL if the
|
||||
* listpack has no elements. */
|
||||
unsigned char *lpFirst(unsigned char *lp) {
|
||||
lp += LP_HDR_SIZE; /* Skip the header. */
|
||||
if (lp[0] == LP_EOF) return NULL;
|
||||
return lp;
|
||||
}
|
||||
|
||||
/* Return a pointer to the last element of the listpack, or NULL if the
|
||||
* listpack has no elements. */
|
||||
unsigned char *lpLast(unsigned char *lp) {
|
||||
unsigned char *p = lp+lpGetTotalBytes(lp)-1; /* Seek EOF element. */
|
||||
return lpPrev(lp,p); /* Will return NULL if EOF is the only element. */
|
||||
}
|
||||
|
||||
/* Return the number of elements inside the listpack. This function attempts
|
||||
* to use the cached value when within range, otherwise a full scan is
|
||||
* needed. As a side effect of calling this function, the listpack header
|
||||
* could be modified, because if the count is found to be already within
|
||||
* the 'numele' header field range, the new value is set. */
|
||||
uint32_t lpLength(unsigned char *lp) {
|
||||
uint32_t numele = lpGetNumElements(lp);
|
||||
if (numele != LP_HDR_NUMELE_UNKNOWN) return numele;
|
||||
|
||||
/* Too many elements inside the listpack. We need to scan in order
|
||||
* to get the total number. */
|
||||
uint32_t count = 0;
|
||||
unsigned char *p = lpFirst(lp);
|
||||
while(p) {
|
||||
count++;
|
||||
p = lpNext(lp,p);
|
||||
}
|
||||
|
||||
/* If the count is again within range of the header numele field,
|
||||
* set it. */
|
||||
if (count < LP_HDR_NUMELE_UNKNOWN) lpSetNumElements(lp,count);
|
||||
return count;
|
||||
}
|
||||
|
||||
/* Return the listpack element pointed by 'p'.
|
||||
*
|
||||
* The function changes behavior depending on the passed 'intbuf' value.
|
||||
* Specifically, if 'intbuf' is NULL:
|
||||
*
|
||||
* If the element is internally encoded as an integer, the function returns
|
||||
* NULL and populates the integer value by reference in 'count'. Otherwise if
|
||||
* the element is encoded as a string a pointer to the string (pointing inside
|
||||
* the listpack itself) is returned, and 'count' is set to the length of the
|
||||
* string.
|
||||
*
|
||||
* If instead 'intbuf' points to a buffer passed by the caller, that must be
|
||||
* at least LP_INTBUF_SIZE bytes, the function always returns the element as
|
||||
* it was a string (returning the pointer to the string and setting the
|
||||
* 'count' argument to the string length by reference). However if the element
|
||||
* is encoded as an integer, the 'intbuf' buffer is used in order to store
|
||||
* the string representation.
|
||||
*
|
||||
* The user should use one or the other form depending on what the value will
|
||||
* be used for. If there is immediate usage for an integer value returned
|
||||
* by the function, than to pass a buffer (and convert it back to a number)
|
||||
* is of course useless.
|
||||
*
|
||||
* If the function is called against a badly encoded ziplist, so that there
|
||||
* is no valid way to parse it, the function returns like if there was an
|
||||
* integer encoded with value 12345678900000000 + <unrecognized byte>, this may
|
||||
* be an hint to understand that something is wrong. To crash in this case is
|
||||
* not sensible because of the different requirements of the application using
|
||||
* this lib.
|
||||
*
|
||||
* Similarly, there is no error returned since the listpack normally can be
|
||||
* assumed to be valid, so that would be a very high API cost. However a function
|
||||
* in order to check the integrity of the listpack at load time is provided,
|
||||
* check lpIsValid(). */
|
||||
unsigned char *lpGet(unsigned char *p, int64_t *count, unsigned char *intbuf) {
|
||||
int64_t val;
|
||||
uint64_t uval, negstart, negmax;
|
||||
|
||||
if (LP_ENCODING_IS_7BIT_UINT(p[0])) {
|
||||
negstart = UINT64_MAX; /* 7 bit ints are always positive. */
|
||||
negmax = 0;
|
||||
uval = p[0] & 0x7f;
|
||||
} else if (LP_ENCODING_IS_6BIT_STR(p[0])) {
|
||||
*count = LP_ENCODING_6BIT_STR_LEN(p);
|
||||
return p+1;
|
||||
} else if (LP_ENCODING_IS_13BIT_INT(p[0])) {
|
||||
uval = ((p[0]&0x1f)<<8) | p[1];
|
||||
negstart = (uint64_t)1<<12;
|
||||
negmax = 8191;
|
||||
} else if (LP_ENCODING_IS_16BIT_INT(p[0])) {
|
||||
uval = (uint64_t)p[1] |
|
||||
(uint64_t)p[2]<<8;
|
||||
negstart = (uint64_t)1<<15;
|
||||
negmax = UINT16_MAX;
|
||||
} else if (LP_ENCODING_IS_24BIT_INT(p[0])) {
|
||||
uval = (uint64_t)p[1] |
|
||||
(uint64_t)p[2]<<8 |
|
||||
(uint64_t)p[3]<<16;
|
||||
negstart = (uint64_t)1<<23;
|
||||
negmax = UINT32_MAX>>8;
|
||||
} else if (LP_ENCODING_IS_32BIT_INT(p[0])) {
|
||||
uval = (uint64_t)p[1] |
|
||||
(uint64_t)p[2]<<8 |
|
||||
(uint64_t)p[3]<<16 |
|
||||
(uint64_t)p[4]<<24;
|
||||
negstart = (uint64_t)1<<31;
|
||||
negmax = UINT32_MAX;
|
||||
} else if (LP_ENCODING_IS_64BIT_INT(p[0])) {
|
||||
uval = (uint64_t)p[1] |
|
||||
(uint64_t)p[2]<<8 |
|
||||
(uint64_t)p[3]<<16 |
|
||||
(uint64_t)p[4]<<24 |
|
||||
(uint64_t)p[5]<<32 |
|
||||
(uint64_t)p[6]<<40 |
|
||||
(uint64_t)p[7]<<48 |
|
||||
(uint64_t)p[8]<<56;
|
||||
negstart = (uint64_t)1<<63;
|
||||
negmax = UINT64_MAX;
|
||||
} else if (LP_ENCODING_IS_12BIT_STR(p[0])) {
|
||||
*count = LP_ENCODING_12BIT_STR_LEN(p);
|
||||
return p+2;
|
||||
} else if (LP_ENCODING_IS_32BIT_STR(p[0])) {
|
||||
*count = LP_ENCODING_32BIT_STR_LEN(p);
|
||||
return p+5;
|
||||
} else {
|
||||
uval = 12345678900000000ULL + p[0];
|
||||
negstart = UINT64_MAX;
|
||||
negmax = 0;
|
||||
}
|
||||
|
||||
/* We reach this code path only for integer encodings.
|
||||
* Convert the unsigned value to the signed one using two's complement
|
||||
* rule. */
|
||||
if (uval >= negstart) {
|
||||
/* This three steps conversion should avoid undefined behaviors
|
||||
* in the unsigned -> signed conversion. */
|
||||
uval = negmax-uval;
|
||||
val = uval;
|
||||
val = -val-1;
|
||||
} else {
|
||||
val = uval;
|
||||
}
|
||||
|
||||
/* Return the string representation of the integer or the value itself
|
||||
* depending on intbuf being NULL or not. */
|
||||
if (intbuf) {
|
||||
*count = snprintf((char*)intbuf,LP_INTBUF_SIZE,"%lld",val);
|
||||
return intbuf;
|
||||
} else {
|
||||
*count = val;
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
|
||||
/* Insert, delete or replace the specified element 'ele' of lenght 'len' at
|
||||
* the specified position 'p', with 'p' being a listpack element pointer
|
||||
* obtained with lpFirst(), lpLast(), lpIndex(), lpNext(), lpPrev() or
|
||||
* lpSeek().
|
||||
*
|
||||
* The element is inserted before, after, or replaces the element pointed
|
||||
* by 'p' depending on the 'where' argument, that can be LP_BEFORE, LP_AFTER
|
||||
* or LP_REPLACE.
|
||||
*
|
||||
* If 'ele' is set to NULL, the function removes the element pointed by 'p'
|
||||
* instead of inserting one.
|
||||
*
|
||||
* Returns NULL on out of memory or when the listpack total length would exceed
|
||||
* the max allowed size of 2^32-1, otherwise the new pointer to the listpack
|
||||
* holding the new element is returned (and the old pointer passed is no longer
|
||||
* considered valid)
|
||||
*
|
||||
* If 'newp' is not NULL, at the end of a successful call '*newp' will be set
|
||||
* to the address of the element just added, so that it will be possible to
|
||||
* continue an interation with lpNext() and lpPrev().
|
||||
*
|
||||
* For deletion operations ('ele' set to NULL) 'newp' is set to the next
|
||||
* element, on the right of the deleted one, or to NULL if the deleted element
|
||||
* was the last one. */
|
||||
unsigned char *lpInsert(unsigned char *lp, unsigned char *ele, uint32_t size, unsigned char *p, int where, unsigned char **newp) {
|
||||
unsigned char intenc[LP_MAX_INT_ENCODING_LEN];
|
||||
unsigned char backlen[LP_MAX_BACKLEN_SIZE];
|
||||
|
||||
uint64_t enclen; /* The length of the encoded element. */
|
||||
|
||||
/* An element pointer set to NULL means deletion, which is conceptually
|
||||
* replacing the element with a zero-length element. So whatever we
|
||||
* get passed as 'where', set it to LP_REPLACE. */
|
||||
if (ele == NULL) where = LP_REPLACE;
|
||||
|
||||
/* If we need to insert after the current element, we just jump to the
|
||||
* next element (that could be the EOF one) and handle the case of
|
||||
* inserting before. So the function will actually deal with just two
|
||||
* cases: LP_BEFORE and LP_REPLACE. */
|
||||
if (where == LP_AFTER) {
|
||||
p = lpSkip(p);
|
||||
where = LP_BEFORE;
|
||||
}
|
||||
|
||||
/* Store the offset of the element 'p', so that we can obtain its
|
||||
* address again after a reallocation. */
|
||||
unsigned long poff = p-lp;
|
||||
|
||||
/* Calling lpEncodeGetType() results into the encoded version of the
|
||||
* element to be stored into 'intenc' in case it is representable as
|
||||
* an integer: in that case, the function returns LP_ENCODING_INT.
|
||||
* Otherwise if LP_ENCODING_STR is returned, we'll have to call
|
||||
* lpEncodeString() to actually write the encoded string on place later.
|
||||
*
|
||||
* Whatever the returned encoding is, 'enclen' is populated with the
|
||||
* length of the encoded element. */
|
||||
int enctype;
|
||||
if (ele) {
|
||||
enctype = lpEncodeGetType(ele,size,intenc,&enclen);
|
||||
} else {
|
||||
enctype = -1;
|
||||
enclen = 0;
|
||||
}
|
||||
|
||||
/* We need to also encode the backward-parsable length of the element
|
||||
* and append it to the end: this allows to traverse the listpack from
|
||||
* the end to the start. */
|
||||
unsigned long backlen_size = ele ? lpEncodeBacklen(backlen,enclen) : 0;
|
||||
uint64_t old_listpack_bytes = lpGetTotalBytes(lp);
|
||||
uint32_t replaced_len = 0;
|
||||
if (where == LP_REPLACE) {
|
||||
replaced_len = lpCurrentEncodedSize(p);
|
||||
replaced_len += lpEncodeBacklen(NULL,replaced_len);
|
||||
}
|
||||
|
||||
uint64_t new_listpack_bytes = old_listpack_bytes + enclen + backlen_size
|
||||
- replaced_len;
|
||||
if (new_listpack_bytes > UINT32_MAX) return NULL;
|
||||
|
||||
/* We now need to reallocate in order to make space or shrink the
|
||||
* allocation (in case 'when' value is LP_REPLACE and the new element is
|
||||
* smaller). However we do that before memmoving the memory to
|
||||
* make room for the new element if the final allocation will get
|
||||
* larger, or we do it after if the final allocation will get smaller. */
|
||||
|
||||
unsigned char *dst = lp + poff; /* May be updated after reallocation. */
|
||||
|
||||
/* Realloc before: we need more room. */
|
||||
if (new_listpack_bytes > old_listpack_bytes) {
|
||||
if ((lp = lp_realloc(lp,new_listpack_bytes)) == NULL) return NULL;
|
||||
dst = lp + poff;
|
||||
}
|
||||
|
||||
/* Setup the listpack relocating the elements to make the exact room
|
||||
* we need to store the new one. */
|
||||
if (where == LP_BEFORE) {
|
||||
memmove(dst+enclen+backlen_size,dst,old_listpack_bytes-poff);
|
||||
} else { /* LP_REPLACE. */
|
||||
long lendiff = (enclen+backlen_size)-replaced_len;
|
||||
memmove(dst+replaced_len+lendiff,
|
||||
dst+replaced_len,
|
||||
old_listpack_bytes-poff-replaced_len);
|
||||
}
|
||||
|
||||
/* Realloc after: we need to free space. */
|
||||
if (new_listpack_bytes < old_listpack_bytes) {
|
||||
if ((lp = lp_realloc(lp,new_listpack_bytes)) == NULL) return NULL;
|
||||
dst = lp + poff;
|
||||
}
|
||||
|
||||
/* Store the entry. */
|
||||
if (newp) {
|
||||
*newp = dst;
|
||||
/* In case of deletion, set 'newp' to NULL if the next element is
|
||||
* the EOF element. */
|
||||
if (!ele && dst[0] == LP_EOF) *newp = NULL;
|
||||
}
|
||||
if (ele) {
|
||||
if (enctype == LP_ENCODING_INT) {
|
||||
memcpy(dst,intenc,enclen);
|
||||
} else {
|
||||
lpEncodeString(dst,ele,size);
|
||||
}
|
||||
dst += enclen;
|
||||
memcpy(dst,backlen,backlen_size);
|
||||
dst += backlen_size;
|
||||
}
|
||||
|
||||
/* Update header. */
|
||||
if (where != LP_REPLACE || ele == NULL) {
|
||||
uint32_t num_elements = lpGetNumElements(lp);
|
||||
if (num_elements != LP_HDR_NUMELE_UNKNOWN) {
|
||||
if (ele)
|
||||
lpSetNumElements(lp,num_elements+1);
|
||||
else
|
||||
lpSetNumElements(lp,num_elements-1);
|
||||
}
|
||||
}
|
||||
lpSetTotalBytes(lp,new_listpack_bytes);
|
||||
return lp;
|
||||
}
|
||||
|
||||
/* Append the specified element 'ele' of lenght 'len' at the end of the
|
||||
* listpack. It is implemented in terms of lpInsert(), so the return value is
|
||||
* the same as lpInsert(). */
|
||||
unsigned char *lpAppend(unsigned char *lp, unsigned char *ele, uint32_t size) {
|
||||
uint64_t listpack_bytes = lpGetTotalBytes(lp);
|
||||
unsigned char *eofptr = lp + listpack_bytes - 1;
|
||||
return lpInsert(lp,ele,size,eofptr,LP_BEFORE,NULL);
|
||||
}
|
||||
|
||||
/* Remove the element pointed by 'p', and return the resulting listpack.
|
||||
* If 'newp' is not NULL, the next element pointer (to the right of the
|
||||
* deleted one) is returned by reference. If the deleted element was the
|
||||
* last one, '*newp' is set to NULL. */
|
||||
unsigned char *lpDelete(unsigned char *lp, unsigned char *p, unsigned char **newp) {
|
||||
return lpInsert(lp,NULL,0,p,LP_REPLACE,newp);
|
||||
}
|
||||
|
||||
/* Return the total number of bytes the listpack is composed of. */
|
||||
uint32_t lpBytes(unsigned char *lp) {
|
||||
return lpGetTotalBytes(lp);
|
||||
}
|
||||
|
||||
/* Seek the specified element and returns the pointer to the seeked element.
|
||||
* Positive indexes specify the zero-based element to seek from the head to
|
||||
* the tail, negative indexes specify elements starting from the tail, where
|
||||
* -1 means the last element, -2 the penultimate and so forth. If the index
|
||||
* is out of range, NULL is returned. */
|
||||
unsigned char *lpSeek(unsigned char *lp, long index) {
|
||||
int forward = 1; /* Seek forward by default. */
|
||||
|
||||
/* We want to seek from left to right or the other way around
|
||||
* depending on the listpack length and the element position.
|
||||
* However if the listpack length cannot be obtained in constant time,
|
||||
* we always seek from left to right. */
|
||||
uint32_t numele = lpGetNumElements(lp);
|
||||
if (numele != LP_HDR_NUMELE_UNKNOWN) {
|
||||
if (index < 0) index = (long)numele+index;
|
||||
if (index < 0) return NULL; /* Index still < 0 means out of range. */
|
||||
if (index >= numele) return NULL; /* Out of range the other side. */
|
||||
/* We want to scan right-to-left if the element we are looking for
|
||||
* is past the half of the listpack. */
|
||||
if (index > numele/2) {
|
||||
forward = 0;
|
||||
/* Left to right scanning always expects a negative index. Convert
|
||||
* our index to negative form. */
|
||||
index -= numele;
|
||||
}
|
||||
} else {
|
||||
/* If the listpack length is unspecified, for negative indexes we
|
||||
* want to always scan left-to-right. */
|
||||
if (index < 0) forward = 0;
|
||||
}
|
||||
|
||||
/* Forward and backward scanning is trivially based on lpNext()/lpPrev(). */
|
||||
if (forward) {
|
||||
unsigned char *ele = lpFirst(lp);
|
||||
while (index > 0 && ele) {
|
||||
ele = lpNext(lp,ele);
|
||||
index--;
|
||||
}
|
||||
return ele;
|
||||
} else {
|
||||
unsigned char *ele = lpLast(lp);
|
||||
while (index < -1 && ele) {
|
||||
ele = lpPrev(lp,ele);
|
||||
index++;
|
||||
}
|
||||
return ele;
|
||||
}
|
||||
}
|
||||
|
61
src/listpack.h
Normal file
61
src/listpack.h
Normal file
@ -0,0 +1,61 @@
|
||||
/* Listpack -- A lists of strings serialization format
|
||||
*
|
||||
* This file implements the specification you can find at:
|
||||
*
|
||||
* https://github.com/antirez/listpack
|
||||
*
|
||||
* Copyright (c) 2017, Salvatore Sanfilippo <antirez at gmail dot com>
|
||||
* All rights reserved.
|
||||
*
|
||||
* Redistribution and use in source and binary forms, with or without
|
||||
* modification, are permitted provided that the following conditions are met:
|
||||
*
|
||||
* * Redistributions of source code must retain the above copyright notice,
|
||||
* this list of conditions and the following disclaimer.
|
||||
* * Redistributions in binary form must reproduce the above copyright
|
||||
* notice, this list of conditions and the following disclaimer in the
|
||||
* documentation and/or other materials provided with the distribution.
|
||||
* * Neither the name of Redis nor the names of its contributors may be used
|
||||
* to endorse or promote products derived from this software without
|
||||
* specific prior written permission.
|
||||
*
|
||||
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
|
||||
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
|
||||
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
|
||||
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
|
||||
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
|
||||
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
|
||||
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
|
||||
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
|
||||
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
|
||||
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
|
||||
* POSSIBILITY OF SUCH DAMAGE.
|
||||
*/
|
||||
|
||||
#ifndef __LISTPACK_H
|
||||
#define __LISTPACK_H
|
||||
|
||||
#include <stdint.h>
|
||||
|
||||
#define LP_INTBUF_SIZE 21 /* 20 digits of -2^63 + 1 null term = 21. */
|
||||
|
||||
/* lpInsert() where argument possible values: */
|
||||
#define LP_BEFORE 0
|
||||
#define LP_AFTER 1
|
||||
#define LP_REPLACE 2
|
||||
|
||||
unsigned char *lpNew(void);
|
||||
void lpFree(unsigned char *lp);
|
||||
unsigned char *lpInsert(unsigned char *lp, unsigned char *ele, uint32_t size, unsigned char *p, int where, unsigned char **newp);
|
||||
unsigned char *lpAppend(unsigned char *lp, unsigned char *ele, uint32_t size);
|
||||
unsigned char *lpDelete(unsigned char *lp, unsigned char *p, unsigned char **newp);
|
||||
uint32_t lpLength(unsigned char *lp);
|
||||
unsigned char *lpGet(unsigned char *p, int64_t *count, unsigned char *intbuf);
|
||||
unsigned char *lpFirst(unsigned char *lp);
|
||||
unsigned char *lpLast(unsigned char *lp);
|
||||
unsigned char *lpNext(unsigned char *lp, unsigned char *p);
|
||||
unsigned char *lpPrev(unsigned char *lp, unsigned char *p);
|
||||
uint32_t lpBytes(unsigned char *lp);
|
||||
unsigned char *lpSeek(unsigned char *lp, long index);
|
||||
|
||||
#endif
|
44
src/listpack_malloc.h
Normal file
44
src/listpack_malloc.h
Normal file
@ -0,0 +1,44 @@
|
||||
/* Listpack -- A lists of strings serialization format
|
||||
* https://github.com/antirez/listpack
|
||||
*
|
||||
* Copyright (c) 2017, Salvatore Sanfilippo <antirez at gmail dot com>
|
||||
* All rights reserved.
|
||||
*
|
||||
* Redistribution and use in source and binary forms, with or without
|
||||
* modification, are permitted provided that the following conditions are met:
|
||||
*
|
||||
* * Redistributions of source code must retain the above copyright notice,
|
||||
* this list of conditions and the following disclaimer.
|
||||
* * Redistributions in binary form must reproduce the above copyright
|
||||
* notice, this list of conditions and the following disclaimer in the
|
||||
* documentation and/or other materials provided with the distribution.
|
||||
* * Neither the name of Redis nor the names of its contributors may be used
|
||||
* to endorse or promote products derived from this software without
|
||||
* specific prior written permission.
|
||||
*
|
||||
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
|
||||
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
|
||||
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
|
||||
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
|
||||
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
|
||||
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
|
||||
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
|
||||
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
|
||||
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
|
||||
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
|
||||
* POSSIBILITY OF SUCH DAMAGE.
|
||||
*/
|
||||
|
||||
/* Allocator selection.
|
||||
*
|
||||
* This file is used in order to change the Rax allocator at compile time.
|
||||
* Just define the following defines to what you want to use. Also add
|
||||
* the include of your alternate allocator if needed (not needed in order
|
||||
* to use the default libc allocator). */
|
||||
|
||||
#ifndef LISTPACK_ALLOC_H
|
||||
#define LISTPACK_ALLOC_H
|
||||
#define lp_malloc malloc
|
||||
#define lp_realloc realloc
|
||||
#define lp_free free
|
||||
#endif
|
@ -232,6 +232,13 @@ robj *createZsetZiplistObject(void) {
|
||||
return o;
|
||||
}
|
||||
|
||||
robj *createStreamObject(void) {
|
||||
stream *s = streamNew();
|
||||
robj *o = createObject(OBJ_STREAM,s);
|
||||
o->encoding = OBJ_ENCODING_STREAM;
|
||||
return o;
|
||||
}
|
||||
|
||||
robj *createModuleObject(moduleType *mt, void *value) {
|
||||
moduleValue *mv = zmalloc(sizeof(*mv));
|
||||
mv->type = mt;
|
||||
|
24
src/rax.c
24
src/rax.c
@ -131,7 +131,7 @@ static inline void raxStackFree(raxStack *ts) {
|
||||
}
|
||||
|
||||
/* ----------------------------------------------------------------------------
|
||||
* Radis tree implementation
|
||||
* Radix tree implementation
|
||||
* --------------------------------------------------------------------------*/
|
||||
|
||||
/* Allocate a new non compressed node with the specified number of children.
|
||||
@ -873,7 +873,8 @@ raxNode *raxRemoveChild(raxNode *parent, raxNode *child) {
|
||||
memmove(((char*)cp)-1,cp,(parent->size-taillen-1)*sizeof(raxNode**));
|
||||
|
||||
/* Move the remaining "tail" pointer at the right position as well. */
|
||||
memmove(((char*)c)-1,c+1,taillen*sizeof(raxNode**)+parent->iskey*sizeof(void*));
|
||||
size_t valuelen = (parent->iskey && !parent->isnull) ? sizeof(void*) : 0;
|
||||
memmove(((char*)c)-1,c+1,taillen*sizeof(raxNode**)+valuelen);
|
||||
|
||||
/* 4. Update size. */
|
||||
parent->size--;
|
||||
@ -1175,7 +1176,7 @@ void raxIteratorDelChars(raxIterator *it, size_t count) {
|
||||
* The function returns 1 on success or 0 on out of memory. */
|
||||
int raxIteratorNextStep(raxIterator *it, int noup) {
|
||||
if (it->flags & RAX_ITER_EOF) {
|
||||
return 0;
|
||||
return 1;
|
||||
} else if (it->flags & RAX_ITER_JUST_SEEKED) {
|
||||
it->flags &= ~RAX_ITER_JUST_SEEKED;
|
||||
return 1;
|
||||
@ -1187,10 +1188,6 @@ int raxIteratorNextStep(raxIterator *it, int noup) {
|
||||
size_t orig_stack_items = it->stack.items;
|
||||
raxNode *orig_node = it->node;
|
||||
|
||||
/* Clear the EOF flag: it will be set again if the EOF condition
|
||||
* is still valid. */
|
||||
it->flags &= ~RAX_ITER_EOF;
|
||||
|
||||
while(1) {
|
||||
int children = it->node->iscompr ? 1 : it->node->size;
|
||||
if (!noup && children) {
|
||||
@ -1291,7 +1288,7 @@ int raxSeekGreatest(raxIterator *it) {
|
||||
* effect to the one of raxIteratorPrevSte(). */
|
||||
int raxIteratorPrevStep(raxIterator *it, int noup) {
|
||||
if (it->flags & RAX_ITER_EOF) {
|
||||
return 0;
|
||||
return 1;
|
||||
} else if (it->flags & RAX_ITER_JUST_SEEKED) {
|
||||
it->flags &= ~RAX_ITER_JUST_SEEKED;
|
||||
return 1;
|
||||
@ -1412,6 +1409,7 @@ int raxSeek(raxIterator *it, const char *op, unsigned char *ele, size_t len) {
|
||||
it->node = it->rt->head;
|
||||
if (!raxSeekGreatest(it)) return 0;
|
||||
assert(it->node->iskey);
|
||||
it->data = raxGetData(it->node);
|
||||
return 1;
|
||||
}
|
||||
|
||||
@ -1430,6 +1428,7 @@ int raxSeek(raxIterator *it, const char *op, unsigned char *ele, size_t len) {
|
||||
/* We found our node, since the key matches and we have an
|
||||
* "equal" condition. */
|
||||
if (!raxIteratorAddChars(it,ele,len)) return 0; /* OOM. */
|
||||
it->data = raxGetData(it->node);
|
||||
} else if (lt || gt) {
|
||||
/* Exact key not found or eq flag not set. We have to set as current
|
||||
* key the one represented by the node we stopped at, and perform
|
||||
@ -1502,6 +1501,7 @@ int raxSeek(raxIterator *it, const char *op, unsigned char *ele, size_t len) {
|
||||
* the previous sub-tree. */
|
||||
if (nodechar < keychar) {
|
||||
if (!raxSeekGreatest(it)) return 0;
|
||||
it->data = raxGetData(it->node);
|
||||
} else {
|
||||
if (!raxIteratorAddChars(it,it->node->data,it->node->size))
|
||||
return 0;
|
||||
@ -1647,6 +1647,14 @@ void raxStop(raxIterator *it) {
|
||||
raxStackFree(&it->stack);
|
||||
}
|
||||
|
||||
/* Return if the iterator is in an EOF state. This happens when raxSeek()
|
||||
* failed to seek an appropriate element, so that raxNext() or raxPrev()
|
||||
* will return zero, or when an EOF condition was reached while iterating
|
||||
* with raxNext() and raxPrev(). */
|
||||
int raxEOF(raxIterator *it) {
|
||||
return it->flags & RAX_ITER_EOF;
|
||||
}
|
||||
|
||||
/* ----------------------------- Introspection ------------------------------ */
|
||||
|
||||
/* This function is mostly used for debugging and learning purposes.
|
||||
|
@ -155,6 +155,7 @@ int raxPrev(raxIterator *it);
|
||||
int raxRandomWalk(raxIterator *it, size_t steps);
|
||||
int raxCompare(raxIterator *iter, const char *op, unsigned char *key, size_t key_len);
|
||||
void raxStop(raxIterator *it);
|
||||
int raxEOF(raxIterator *it);
|
||||
void raxShow(rax *rax);
|
||||
|
||||
#endif
|
||||
|
@ -89,10 +89,11 @@
|
||||
#define RDB_TYPE_ZSET_ZIPLIST 12
|
||||
#define RDB_TYPE_HASH_ZIPLIST 13
|
||||
#define RDB_TYPE_LIST_QUICKLIST 14
|
||||
#define RDB_TYPE_STREAM_LISTPACKS 15
|
||||
/* NOTE: WHEN ADDING NEW RDB TYPE, UPDATE rdbIsObjectType() BELOW */
|
||||
|
||||
/* Test if a type is an object type. */
|
||||
#define rdbIsObjectType(t) ((t >= 0 && t <= 7) || (t >= 9 && t <= 14))
|
||||
#define rdbIsObjectType(t) ((t >= 0 && t <= 7) || (t >= 9 && t <= 15))
|
||||
|
||||
/* Special RDB opcodes (saved/loaded with rdbSaveType/rdbLoadType). */
|
||||
#define RDB_OPCODE_AUX 250
|
||||
|
@ -302,6 +302,8 @@ struct redisCommand redisCommandTable[] = {
|
||||
{"pfcount",pfcountCommand,-2,"r",0,NULL,1,-1,1,0,0},
|
||||
{"pfmerge",pfmergeCommand,-2,"wm",0,NULL,1,-1,1,0,0},
|
||||
{"pfdebug",pfdebugCommand,-3,"w",0,NULL,0,0,0,0,0},
|
||||
{"xadd",xaddCommand,-4,"wmF",0,NULL,1,1,1,0,0},
|
||||
{"xrange",xrangeCommand,-4,"r",0,NULL,1,1,1,0,0},
|
||||
{"post",securityWarningCommand,-1,"lt",0,NULL,0,0,0,0,0},
|
||||
{"host:",securityWarningCommand,-1,"lt",0,NULL,0,0,0,0,0},
|
||||
{"latency",latencyCommand,-2,"aslt",0,NULL,0,0,0,0,0}
|
||||
|
@ -59,6 +59,7 @@ typedef long long mstime_t; /* millisecond time type. */
|
||||
#include "anet.h" /* Networking the easy way */
|
||||
#include "ziplist.h" /* Compact list data structure */
|
||||
#include "intset.h" /* Compact integer set structure */
|
||||
#include "stream.h" /* Stream data type header file. */
|
||||
#include "version.h" /* Version macro */
|
||||
#include "util.h" /* Misc functions useful in many places */
|
||||
#include "latency.h" /* Latency monitor API */
|
||||
@ -451,6 +452,7 @@ typedef long long mstime_t; /* millisecond time type. */
|
||||
#define OBJ_SET 2
|
||||
#define OBJ_ZSET 3
|
||||
#define OBJ_HASH 4
|
||||
#define OBJ_STREAM 5
|
||||
|
||||
/* The "module" object type is a special one that signals that the object
|
||||
* is one directly managed by a Redis module. In this case the value points
|
||||
@ -575,6 +577,7 @@ typedef struct RedisModuleDigest {
|
||||
#define OBJ_ENCODING_SKIPLIST 7 /* Encoded as skiplist */
|
||||
#define OBJ_ENCODING_EMBSTR 8 /* Embedded sds string encoding */
|
||||
#define OBJ_ENCODING_QUICKLIST 9 /* Encoded as linked list of ziplists */
|
||||
#define OBJ_ENCODING_STREAM 10 /* Encoded as a radix tree of listpacks */
|
||||
|
||||
#define LRU_BITS 24
|
||||
#define LRU_CLOCK_MAX ((1<<LRU_BITS)-1) /* Max value of obj->lru */
|
||||
@ -1414,6 +1417,9 @@ void handleClientsBlockedOnLists(void);
|
||||
void popGenericCommand(client *c, int where);
|
||||
void signalListAsReady(redisDb *db, robj *key);
|
||||
|
||||
/* Stream data type. */
|
||||
stream *streamNew(void);
|
||||
|
||||
/* MULTI/EXEC/WATCH... */
|
||||
void unwatchAllKeys(client *c);
|
||||
void initClientMultiState(client *c);
|
||||
@ -1455,6 +1461,7 @@ robj *createIntsetObject(void);
|
||||
robj *createHashObject(void);
|
||||
robj *createZsetObject(void);
|
||||
robj *createZsetZiplistObject(void);
|
||||
robj *createStreamObject(void);
|
||||
robj *createModuleObject(moduleType *mt, void *value);
|
||||
int getLongFromObjectOrReply(client *c, robj *o, long *target, const char *msg);
|
||||
int checkType(client *c, robj *o, int type);
|
||||
@ -1992,6 +1999,8 @@ void pfdebugCommand(client *c);
|
||||
void latencyCommand(client *c);
|
||||
void moduleCommand(client *c);
|
||||
void securityWarningCommand(client *c);
|
||||
void xaddCommand(client *c);
|
||||
void xrangeCommand(client *c);
|
||||
|
||||
#if defined(__GNUC__)
|
||||
void *calloc(size_t count, size_t size) __attribute__ ((deprecated));
|
||||
|
21
src/stream.h
Normal file
21
src/stream.h
Normal file
@ -0,0 +1,21 @@
|
||||
#ifndef STREAM_H
|
||||
#define STREAM_H
|
||||
|
||||
#include "rax.h"
|
||||
|
||||
/* Stream item ID: a 128 bit number composed of a milliseconds time and
|
||||
* a sequence counter. IDs generated in the same millisecond (or in a past
|
||||
* millisecond if the clock jumped backward) will use the millisecond time
|
||||
* of the latest generated ID and an incremented sequence. */
|
||||
typedef struct streamID {
|
||||
uint64_t ms; /* Unix time in milliseconds. */
|
||||
uint64_t seq; /* Sequence number. */
|
||||
} streamID;
|
||||
|
||||
typedef struct stream {
|
||||
rax *rax; /* The radix tree holding the stream. */
|
||||
uint64_t length; /* Number of elements inside this stream. */
|
||||
streamID last_id; /* Zero if there are yet no items. */
|
||||
} stream;
|
||||
|
||||
#endif
|
376
src/t_stream.c
Normal file
376
src/t_stream.c
Normal file
@ -0,0 +1,376 @@
|
||||
/*
|
||||
* Copyright (c) 2017, Salvatore Sanfilippo <antirez at gmail dot com>
|
||||
* All rights reserved.
|
||||
*
|
||||
* Redistribution and use in source and binary forms, with or without
|
||||
* modification, are permitted provided that the following conditions are met:
|
||||
*
|
||||
* * Redistributions of source code must retain the above copyright notice,
|
||||
* this list of conditions and the following disclaimer.
|
||||
* * Redistributions in binary form must reproduce the above copyright
|
||||
* notice, this list of conditions and the following disclaimer in the
|
||||
* documentation and/or other materials provided with the distribution.
|
||||
* * Neither the name of Redis nor the names of its contributors may be used
|
||||
* to endorse or promote products derived from this software without
|
||||
* specific prior written permission.
|
||||
*
|
||||
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
|
||||
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
|
||||
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
|
||||
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
|
||||
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
|
||||
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
|
||||
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
|
||||
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
|
||||
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
|
||||
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
|
||||
* POSSIBILITY OF SUCH DAMAGE.
|
||||
*/
|
||||
|
||||
/* TODO:
|
||||
* - After loading a stream, populate the last ID.
|
||||
*/
|
||||
|
||||
#include "server.h"
|
||||
#include "listpack.h"
|
||||
#include "endianconv.h"
|
||||
#include "stream.h"
|
||||
|
||||
#define STREAM_BYTES_PER_LISTPACK 4096
|
||||
|
||||
/* -----------------------------------------------------------------------
|
||||
* Low level stream encoding: a radix tree of listpacks.
|
||||
* ----------------------------------------------------------------------- */
|
||||
|
||||
/* Create a new stream data structure. */
|
||||
stream *streamNew(void) {
|
||||
stream *s = zmalloc(sizeof(*s));
|
||||
s->rax = raxNew();
|
||||
s->length = 0;
|
||||
s->last_id.ms = 0;
|
||||
s->last_id.seq = 0;
|
||||
return s;
|
||||
}
|
||||
|
||||
/* Generate the next stream item ID given the previous one. If the current
|
||||
* milliseconds Unix time is greater than the previous one, just use this
|
||||
* as time part and start with sequence part of zero. Otherwise we use the
|
||||
* previous time (and never go backward) and increment the sequence. */
|
||||
void streamNextID(streamID *last_id, streamID *new_id) {
|
||||
uint64_t ms = mstime();
|
||||
if (ms > last_id->ms) {
|
||||
new_id->ms = ms;
|
||||
new_id->seq = 0;
|
||||
} else {
|
||||
new_id->ms = last_id->ms;
|
||||
new_id->seq = last_id->seq+1;
|
||||
}
|
||||
}
|
||||
|
||||
/* This is just a wrapper for lpAppend() to directly use a 64 bit integer
|
||||
* instead of a string. */
|
||||
unsigned char *lpAppendInteger(unsigned char *lp, int64_t value) {
|
||||
char buf[LONG_STR_SIZE];
|
||||
int slen = ll2string(buf,sizeof(buf),value);
|
||||
return lpAppend(lp,(unsigned char*)buf,slen);
|
||||
}
|
||||
|
||||
/* This is a wrapper function for lpGet() to directly get an integer value
|
||||
* from the listpack (that may store numbers as a string), converting
|
||||
* the string if needed. */
|
||||
int64_t lpGetInteger(unsigned char *ele) {
|
||||
int64_t v;
|
||||
unsigned char *e = lpGet(ele,&v,NULL);
|
||||
if (e == NULL) return v;
|
||||
/* The following code path should never be used for how listpacks work:
|
||||
* they should always be able to store an int64_t value in integer
|
||||
* encoded form. However the implementation may change. */
|
||||
int retval = string2ll((char*)e,v,&v);
|
||||
serverAssert(retval != 0);
|
||||
return v;
|
||||
}
|
||||
|
||||
/* Convert the specified stream entry ID as a 128 bit big endian number, so
|
||||
* that the IDs can be sorted lexicographically. */
|
||||
void streamEncodeID(void *buf, streamID *id) {
|
||||
uint64_t e[2];
|
||||
e[0] = htonu64(id->ms);
|
||||
e[1] = htonu64(id->seq);
|
||||
memcpy(buf,e,sizeof(e));
|
||||
}
|
||||
|
||||
/* This is the reverse of streamEncodeID(): the decoded ID will be stored
|
||||
* in the 'id' structure passed by reference. The buffer 'buf' must point
|
||||
* to a 128 bit big-endian encoded ID. */
|
||||
void streamDecodeID(void *buf, streamID *id) {
|
||||
uint64_t e[2];
|
||||
memcpy(e,buf,sizeof(e));
|
||||
id->ms = ntohu64(e[0]);
|
||||
id->seq = ntohu64(e[1]);
|
||||
}
|
||||
|
||||
/* Adds a new item into the stream 's' having the specified number of
|
||||
* field-value pairs as specified in 'numfields' and stored into 'argv'.
|
||||
* Returns the new entry ID populating the 'added_id' structure. */
|
||||
void streamAppendItem(stream *s, robj **argv, int numfields, streamID *added_id) {
|
||||
raxIterator ri;
|
||||
raxStart(&ri,s->rax);
|
||||
raxSeek(&ri,"$",NULL,0);
|
||||
|
||||
size_t lp_bytes = 0; /* Total bytes in the tail listpack. */
|
||||
unsigned char *lp = NULL; /* Tail listpack pointer. */
|
||||
|
||||
/* Get a reference to the tail node listpack. */
|
||||
if (raxNext(&ri)) {
|
||||
lp = ri.data;
|
||||
lp_bytes = lpBytes(lp);
|
||||
}
|
||||
raxStop(&ri);
|
||||
|
||||
/* Generate the new entry ID. */
|
||||
streamID id;
|
||||
streamNextID(&s->last_id,&id);
|
||||
|
||||
/* We have to add the key into the radix tree in lexicographic order,
|
||||
* to do so we consider the ID as a single 128 bit number written in
|
||||
* big endian, so that the most significant bytes are the first ones. */
|
||||
uint64_t rax_key[2]; /* Key in the radix tree containing the listpack.*/
|
||||
uint64_t entry_id[2]; /* Entry ID of the new item as 128 bit string. */
|
||||
streamEncodeID(entry_id,&id);
|
||||
|
||||
/* Create a new listpack and radix tree node if needed. */
|
||||
if (lp == NULL || lp_bytes > STREAM_BYTES_PER_LISTPACK) {
|
||||
lp = lpNew();
|
||||
rax_key[0] = entry_id[0];
|
||||
rax_key[1] = entry_id[1];
|
||||
raxInsert(s->rax,(unsigned char*)&rax_key,sizeof(rax_key),lp,NULL);
|
||||
} else {
|
||||
serverAssert(ri.key_len == sizeof(rax_key));
|
||||
memcpy(rax_key,ri.key,sizeof(rax_key));
|
||||
}
|
||||
|
||||
/* Populate the listpack with the new entry. */
|
||||
lp = lpAppend(lp,(unsigned char*)entry_id,sizeof(entry_id));
|
||||
lp = lpAppendInteger(lp,numfields);
|
||||
for (int i = 0; i < numfields; i++) {
|
||||
sds field = argv[i*2]->ptr, value = argv[i*2+1]->ptr;
|
||||
lp = lpAppend(lp,(unsigned char*)field,sdslen(field));
|
||||
lp = lpAppend(lp,(unsigned char*)value,sdslen(value));
|
||||
}
|
||||
|
||||
/* Insert back into the tree in order to update the listpack pointer. */
|
||||
raxInsert(s->rax,(unsigned char*)&rax_key,sizeof(rax_key),lp,NULL);
|
||||
s->length++;
|
||||
s->last_id = id;
|
||||
if (added_id) *added_id = id;
|
||||
raxShow(s->rax);
|
||||
}
|
||||
|
||||
/* Send the specified range to the client 'c'. The range the client will
|
||||
* receive is between start and end inclusive, if 'count' is non zero, no more
|
||||
* than 'count' elemnets are sent. The 'end' pointer can be NULL to mean that
|
||||
* we want all the elements from 'start' till the end of the stream. */
|
||||
size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count) {
|
||||
void *arraylen_ptr = addDeferredMultiBulkLength(c);
|
||||
size_t arraylen = 0;
|
||||
|
||||
/* Seek the radix tree node that contains our start item. */
|
||||
uint64_t key[2];
|
||||
uint64_t end_key[2];
|
||||
streamEncodeID(key,start);
|
||||
if (end) streamEncodeID(end_key,end);
|
||||
raxIterator ri;
|
||||
raxStart(&ri,s->rax);
|
||||
|
||||
/* Seek the correct node in the radix tree. */
|
||||
if (start->ms || start->seq) {
|
||||
raxSeek(&ri,"<=",(unsigned char*)key,sizeof(key));
|
||||
if (raxEOF(&ri)) raxSeek(&ri,">",(unsigned char*)key,sizeof(key));
|
||||
} else {
|
||||
raxSeek(&ri,"^",NULL,0);
|
||||
}
|
||||
|
||||
/* For every radix tree node, iterate the corresponding listpack,
|
||||
* returning elmeents when they are within range. */
|
||||
while (raxNext(&ri)) {
|
||||
serverAssert(ri.key_len == sizeof(key));
|
||||
unsigned char *lp = ri.data;
|
||||
unsigned char *lp_ele = lpFirst(lp);
|
||||
while(lp_ele) {
|
||||
int64_t e_len;
|
||||
unsigned char buf[LP_INTBUF_SIZE];
|
||||
unsigned char *e = lpGet(lp_ele,&e_len,buf);
|
||||
serverAssert(e_len == sizeof(streamID));
|
||||
|
||||
/* Seek next field: number of elements. */
|
||||
lp_ele = lpNext(lp,lp_ele);
|
||||
if (memcmp(e,key,sizeof(key)) >= 0) { /* If current >= start */
|
||||
if (end && memcmp(e,end_key,sizeof(key)) > 0) {
|
||||
break; /* We are already out of range. */
|
||||
}
|
||||
streamID thisid;
|
||||
streamDecodeID(e,&thisid);
|
||||
sds replyid = sdscatfmt(sdsempty(),"+%U.%U\r\n",
|
||||
thisid.ms,thisid.seq);
|
||||
|
||||
/* Emit this stream entry in the client output. */
|
||||
addReplyMultiBulkLen(c,2);
|
||||
addReplySds(c,replyid);
|
||||
int64_t numfields = lpGetInteger(lp_ele);
|
||||
lp_ele = lpNext(lp,lp_ele);
|
||||
addReplyMultiBulkLen(c,numfields*2);
|
||||
for (int64_t i = 0; i < numfields; i++) {
|
||||
/* Emit two items (key-value) per iteration. */
|
||||
for (int k = 0; k < 2; k++) {
|
||||
e = lpGet(lp_ele,&e_len,buf);
|
||||
addReplyBulkCBuffer(c,e,e_len);
|
||||
lp_ele = lpNext(lp,lp_ele);
|
||||
}
|
||||
}
|
||||
|
||||
arraylen++;
|
||||
if (count && count == arraylen) break;
|
||||
} else {
|
||||
/* If we do not emit, we have to discard. */
|
||||
int64_t numfields = lpGetInteger(lp_ele);
|
||||
lp_ele = lpNext(lp,lp_ele);
|
||||
for (int64_t i = 0; i < numfields*2; i++)
|
||||
lp_ele = lpNext(lp,lp_ele);
|
||||
}
|
||||
}
|
||||
if (count && count == arraylen) break;
|
||||
}
|
||||
raxStop(&ri);
|
||||
setDeferredMultiBulkLength(c,arraylen_ptr,arraylen);
|
||||
return arraylen;
|
||||
}
|
||||
|
||||
/* -----------------------------------------------------------------------
|
||||
* Stream commands implementation
|
||||
* ----------------------------------------------------------------------- */
|
||||
|
||||
/* Look the stream at 'key' and return the corresponding stream object.
|
||||
* The function creates a key setting it to an empty stream if needed. */
|
||||
robj *streamTypeLookupWriteOrCreate(client *c, robj *key) {
|
||||
robj *o = lookupKeyWrite(c->db,key);
|
||||
if (o == NULL) {
|
||||
o = createStreamObject();
|
||||
dbAdd(c->db,key,o);
|
||||
} else {
|
||||
if (o->type != OBJ_STREAM) {
|
||||
addReply(c,shared.wrongtypeerr);
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
return o;
|
||||
}
|
||||
|
||||
/* Helper function to convert a string to an unsigned long long value.
|
||||
* The function attempts to use the faster string2ll() function inside
|
||||
* Redis: if it fails, strtoull() is used instead. The function returns
|
||||
* 1 if the conversion happened successfully or 0 if the number is
|
||||
* invalid or out of range. */
|
||||
int string2ull(const char *s, unsigned long long *value) {
|
||||
long long ll;
|
||||
if (string2ll(s,strlen(s),&ll)) {
|
||||
if (ll < 0) return 0; /* Negative values are out of range. */
|
||||
*value = ll;
|
||||
return 1;
|
||||
}
|
||||
errno = 0;
|
||||
*value = strtoull(s,NULL,10);
|
||||
if (errno == EINVAL || errno == ERANGE) return 0; /* strtoull() failed. */
|
||||
return 1; /* Conversion done! */
|
||||
}
|
||||
|
||||
/* Parse a stream ID in the format given by clients to Redis, that is
|
||||
* <ms>.<seq>, and converts it into a streamID structure. If
|
||||
* the specified ID is invalid C_ERR is returned and an error is reported
|
||||
* to the client, otherwise C_OK is returned. The ID may be in incomplete
|
||||
* form, just stating the milliseconds time part of the stream. In such a case
|
||||
* the missing part is set according to the value of 'missing_seq' parameter.
|
||||
* The IDs "-" and "+" specify respectively the minimum and maximum IDs
|
||||
* that can be represented. */
|
||||
int streamParseIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq) {
|
||||
char buf[128];
|
||||
if (sdslen(o->ptr) > sizeof(buf)-1) goto invalid;
|
||||
memcpy(buf,o->ptr,sdslen(o->ptr)+1);
|
||||
|
||||
/* Handle the "-" and "+" special cases. */
|
||||
if (buf[0] == '-' && buf[1] == '\0') {
|
||||
id->ms = 0;
|
||||
id->seq = 0;
|
||||
return C_OK;
|
||||
} else if (buf[0] == '+' && buf[1] == '\0') {
|
||||
id->ms = UINT64_MAX;
|
||||
id->seq = UINT64_MAX;
|
||||
return C_OK;
|
||||
}
|
||||
|
||||
/* Parse <ms>.<seq> form. */
|
||||
char *dot = strchr(buf,'.');
|
||||
if (dot) *dot = '\0';
|
||||
uint64_t ms, seq;
|
||||
if (string2ull(buf,&ms) == 0) goto invalid;
|
||||
if (dot && string2ull(dot+1,&seq) == 0) goto invalid;
|
||||
if (!dot) seq = missing_seq;
|
||||
id->ms = ms;
|
||||
id->seq = seq;
|
||||
return C_OK;
|
||||
|
||||
invalid:
|
||||
addReplyError(c,"Invalid stream ID specified as stream command argument");
|
||||
return C_ERR;
|
||||
}
|
||||
|
||||
/* XADD key [field value] [field value] ... */
|
||||
void xaddCommand(client *c) {
|
||||
if ((c->argc % 2) == 1) {
|
||||
addReplyError(c,"wrong number of arguments for XADD");
|
||||
return;
|
||||
}
|
||||
|
||||
/* Lookup the stream at key. */
|
||||
robj *o;
|
||||
stream *s;
|
||||
if ((o = streamTypeLookupWriteOrCreate(c,c->argv[1])) == NULL) return;
|
||||
s = o->ptr;
|
||||
|
||||
/* Append using the low level function and return the ID. */
|
||||
streamID id;
|
||||
streamAppendItem(s,c->argv+2,(c->argc-2)/2,&id);
|
||||
sds reply = sdscatfmt(sdsempty(),"+%U.%U\r\n",id.ms,id.seq);
|
||||
addReplySds(c,reply);
|
||||
|
||||
signalModifiedKey(c->db,c->argv[1]);
|
||||
notifyKeyspaceEvent(NOTIFY_HASH,"xadd",c->argv[1],c->db->id);
|
||||
server.dirty++;
|
||||
}
|
||||
|
||||
/* XRANGE key start end [COUNT <n>] */
|
||||
void xrangeCommand(client *c) {
|
||||
robj *o;
|
||||
stream *s;
|
||||
streamID startid, endid;
|
||||
long long count = 0;
|
||||
|
||||
if (streamParseIDOrReply(c,c->argv[2],&startid,0) == C_ERR) return;
|
||||
if (streamParseIDOrReply(c,c->argv[3],&endid,UINT64_MAX) == C_ERR) return;
|
||||
|
||||
/* Parse the COUNT option if any. */
|
||||
if (c->argc > 4) {
|
||||
if (strcasecmp(c->argv[4]->ptr,"COUNT") == 0) {
|
||||
if (getLongLongFromObjectOrReply(c,c->argv[5],&count,NULL) != C_OK)
|
||||
return;
|
||||
} else {
|
||||
addReply(c,shared.syntaxerr);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
/* Return the specified range to the user. */
|
||||
if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptymultibulk)) == NULL
|
||||
|| checkType(c,o,OBJ_STREAM)) return;
|
||||
s = o->ptr;
|
||||
streamReplyWithRange(c,s,&startid,&endid,count);
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user