mirror of
https://github.com/fluencelabs/redis
synced 2025-03-19 17:10:50 +00:00
Streams: AOF rewriting + minor iterator improvements.
This commit is contained in:
parent
01ea018c40
commit
26d4f8e3ec
33
src/aof.c
33
src/aof.c
@ -1031,6 +1031,37 @@ int rewriteHashObject(rio *r, robj *key, robj *o) {
|
|||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Emit the commands needed to rebuild a stream object.
|
||||||
|
* The function returns 0 on error, 1 on success. */
|
||||||
|
int rewriteStreamObject(rio *r, robj *key, robj *o) {
|
||||||
|
streamIterator si;
|
||||||
|
streamIteratorStart(&si,o->ptr,NULL,NULL);
|
||||||
|
streamID id;
|
||||||
|
int64_t numfields;
|
||||||
|
|
||||||
|
while(streamIteratorGetID(&si,&id,&numfields)) {
|
||||||
|
/* Emit a two elements array for each item. The first is
|
||||||
|
* the ID, the second is an array of field-value pairs. */
|
||||||
|
|
||||||
|
/* Emit the XADD <key> <id> ...fields... command. */
|
||||||
|
if (rioWriteBulkCount(r,'*',3+numfields*2) == 0) return 0;
|
||||||
|
if (rioWriteBulkString(r,"XADD",4) == 0) return 0;
|
||||||
|
if (rioWriteBulkObject(r,key) == 0) return 0;
|
||||||
|
sds replyid = sdscatfmt(sdsempty(),"%U.%U",id.ms,id.seq);
|
||||||
|
if (rioWriteBulkString(r,replyid,sdslen(replyid)) == 0) return 0;
|
||||||
|
sdsfree(replyid);
|
||||||
|
while(numfields--) {
|
||||||
|
unsigned char *field, *value;
|
||||||
|
int64_t field_len, value_len;
|
||||||
|
streamIteratorGetField(&si,&field,&value,&field_len,&value_len);
|
||||||
|
if (rioWriteBulkString(r,(char*)field,field_len) == 0) return 0;
|
||||||
|
if (rioWriteBulkString(r,(char*)value,value_len) == 0) return 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
streamIteratorStop(&si);
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
/* Call the module type callback in order to rewrite a data type
|
/* Call the module type callback in order to rewrite a data type
|
||||||
* that is exported by a module and is not handled by Redis itself.
|
* that is exported by a module and is not handled by Redis itself.
|
||||||
* The function returns 0 on error, 1 on success. */
|
* The function returns 0 on error, 1 on success. */
|
||||||
@ -1111,6 +1142,8 @@ int rewriteAppendOnlyFileRio(rio *aof) {
|
|||||||
if (rewriteSortedSetObject(aof,&key,o) == 0) goto werr;
|
if (rewriteSortedSetObject(aof,&key,o) == 0) goto werr;
|
||||||
} else if (o->type == OBJ_HASH) {
|
} else if (o->type == OBJ_HASH) {
|
||||||
if (rewriteHashObject(aof,&key,o) == 0) goto werr;
|
if (rewriteHashObject(aof,&key,o) == 0) goto werr;
|
||||||
|
} else if (o->type == OBJ_STREAM) {
|
||||||
|
if (rewriteStreamObject(aof,&key,o) == 0) goto werr;
|
||||||
} else if (o->type == OBJ_MODULE) {
|
} else if (o->type == OBJ_MODULE) {
|
||||||
if (rewriteModuleObject(aof,&key,o) == 0) goto werr;
|
if (rewriteModuleObject(aof,&key,o) == 0) goto werr;
|
||||||
} else {
|
} else {
|
||||||
|
@ -218,19 +218,23 @@ int streamAppendItem(stream *s, robj **argv, int numfields, streamID *added_id,
|
|||||||
void streamIteratorStart(streamIterator *si, stream *s, streamID *start, streamID *end) {
|
void streamIteratorStart(streamIterator *si, stream *s, streamID *start, streamID *end) {
|
||||||
/* Intialize the iterator and translates the iteration start/stop
|
/* Intialize the iterator and translates the iteration start/stop
|
||||||
* elements into a 128 big big-endian number. */
|
* elements into a 128 big big-endian number. */
|
||||||
streamEncodeID(si->start_key,start);
|
if (start) {
|
||||||
|
streamEncodeID(si->start_key,start);
|
||||||
|
} else {
|
||||||
|
si->start_key[0] = 0;
|
||||||
|
si->start_key[0] = 0;
|
||||||
|
}
|
||||||
|
|
||||||
if (end) {
|
if (end) {
|
||||||
streamEncodeID(si->end_key,end);
|
streamEncodeID(si->end_key,end);
|
||||||
} else {
|
} else {
|
||||||
/* We assume that UINT64_MAX is the same in little and big
|
|
||||||
* endian, that is, all bits set. */
|
|
||||||
si->end_key[0] = UINT64_MAX;
|
si->end_key[0] = UINT64_MAX;
|
||||||
si->end_key[0] = UINT64_MAX;
|
si->end_key[0] = UINT64_MAX;
|
||||||
}
|
}
|
||||||
raxStart(&si->ri,s->rax);
|
|
||||||
|
|
||||||
/* Seek the correct node in the radix tree. */
|
/* Seek the correct node in the radix tree. */
|
||||||
if (start->ms || start->seq) {
|
raxStart(&si->ri,s->rax);
|
||||||
|
if (start && (start->ms || start->seq)) {
|
||||||
raxSeek(&si->ri,"<=",(unsigned char*)si->start_key,
|
raxSeek(&si->ri,"<=",(unsigned char*)si->start_key,
|
||||||
sizeof(si->start_key));
|
sizeof(si->start_key));
|
||||||
if (raxEOF(&si->ri))
|
if (raxEOF(&si->ri))
|
||||||
|
Loading…
x
Reference in New Issue
Block a user