mirror of
https://github.com/fluencelabs/redis
synced 2025-03-18 16:40:50 +00:00
Stream & AOF: rewrite stream in correct way
This commit is contained in:
parent
5f3adbee33
commit
b3e80d2f65
46
src/aof.c
46
src/aof.c
@ -1121,23 +1121,39 @@ int rewriteStreamObject(rio *r, robj *key, robj *o) {
|
||||
streamID id;
|
||||
int64_t numfields;
|
||||
|
||||
/* Reconstruct the stream data using XADD commands. */
|
||||
while(streamIteratorGetID(&si,&id,&numfields)) {
|
||||
/* Emit a two elements array for each item. The first is
|
||||
* the ID, the second is an array of field-value pairs. */
|
||||
if (s->length) {
|
||||
/* Reconstruct the stream data using XADD commands. */
|
||||
while(streamIteratorGetID(&si,&id,&numfields)) {
|
||||
/* Emit a two elements array for each item. The first is
|
||||
* the ID, the second is an array of field-value pairs. */
|
||||
|
||||
/* Emit the XADD <key> <id> ...fields... command. */
|
||||
if (rioWriteBulkCount(r,'*',3+numfields*2) == 0) return 0;
|
||||
if (rioWriteBulkString(r,"XADD",4) == 0) return 0;
|
||||
if (rioWriteBulkObject(r,key) == 0) return 0;
|
||||
if (rioWriteBulkStreamID(r,&id) == 0) return 0;
|
||||
while(numfields--) {
|
||||
unsigned char *field, *value;
|
||||
int64_t field_len, value_len;
|
||||
streamIteratorGetField(&si,&field,&value,&field_len,&value_len);
|
||||
if (rioWriteBulkString(r,(char*)field,field_len) == 0) return 0;
|
||||
if (rioWriteBulkString(r,(char*)value,value_len) == 0) return 0;
|
||||
/* Emit the XADD <key> <id> ...fields... command. */
|
||||
if (rioWriteBulkCount(r,'*',3+numfields*2) == 0) return 0;
|
||||
if (rioWriteBulkString(r,"XADD",4) == 0) return 0;
|
||||
if (rioWriteBulkObject(r,key) == 0) return 0;
|
||||
if (rioWriteBulkStreamID(r,&id) == 0) return 0;
|
||||
while(numfields--) {
|
||||
unsigned char *field, *value;
|
||||
int64_t field_len, value_len;
|
||||
streamIteratorGetField(&si,&field,&value,&field_len,&value_len);
|
||||
if (rioWriteBulkString(r,(char*)field,field_len) == 0) return 0;
|
||||
if (rioWriteBulkString(r,(char*)value,value_len) == 0) return 0;
|
||||
}
|
||||
}
|
||||
/* Append XSTREAM SETID after XADD, make sure lastid is correct,
|
||||
* in case of XDEL lastid. */
|
||||
if (rioWriteBulkCount(r,'*',4) == 0) return 0;
|
||||
if (rioWriteBulkString(r,"XSTREAM",7) == 0) return 0;
|
||||
if (rioWriteBulkString(r,"SETID",5) == 0) return 0;
|
||||
if (rioWriteBulkObject(r,key) == 0) return 0;
|
||||
if (rioWriteBulkStreamID(r,&s->last_id) == 0) return 0;
|
||||
} else {
|
||||
/* Using XSTREAM CREATE if the stream is empty. */
|
||||
if (rioWriteBulkCount(r,'*',4) == 0) return 0;
|
||||
if (rioWriteBulkString(r,"XSTREAM",7) == 0) return 0;
|
||||
if (rioWriteBulkString(r,"CREATE",6) == 0) return 0;
|
||||
if (rioWriteBulkObject(r,key) == 0) return 0;
|
||||
if (rioWriteBulkStreamID(r,&s->last_id) == 0) return 0;
|
||||
}
|
||||
|
||||
/* Create all the stream consumer groups. */
|
||||
|
Loading…
x
Reference in New Issue
Block a user