diff --git a/src/server.h b/src/server.h index 37df429b..bc572b1e 100644 --- a/src/server.h +++ b/src/server.h @@ -1425,11 +1425,6 @@ void listTypeConvert(robj *subject, int enc); void unblockClientWaitingData(client *c); void popGenericCommand(client *c, int where); -/* Stream data type. */ -stream *streamNew(void); -void freeStream(stream *s); -size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count); - /* MULTI/EXEC/WATCH... */ void unwatchAllKeys(client *c); void initClientMultiState(client *c); diff --git a/src/stream.h b/src/stream.h index e78af5bc..e3800932 100644 --- a/src/stream.h +++ b/src/stream.h @@ -19,4 +19,35 @@ typedef struct stream { streamID last_id; /* Zero if there are yet no items. */ } stream; +/* We define an iterator to iterate stream items in an abstract way, without + * caring about the radix tree + listpack representation. Technically speaking + * the iterator is only used inside streamReplyWithRange(), so could just + * be implemented inside the function, but practically there is the AOF + * rewriting code that also needs to iterate the stream to emit the XADD + * commands. */ +typedef struct streamIterator { + uint64_t start_key[2]; /* Start key as 128 bit big endian. */ + uint64_t end_key[2]; /* End key as 128 bit big endian. */ + raxIterator ri; /* Rax iterator. */ + unsigned char *lp; /* Current listpack. */ + unsigned char *lp_ele; /* Current listpack cursor. */ + /* Buffers used to hold the string of lpGet() when the element is + * integer encoded, so that there is no string representation of the + * element inside the listpack itself. */ + unsigned char field_buf[LP_INTBUF_SIZE]; + unsigned char value_buf[LP_INTBUF_SIZE]; +} streamIterator; + +/* Prototypes of exported APIs. */ + +struct client; + +stream *streamNew(void); +void freeStream(stream *s); +size_t streamReplyWithRange(struct client *c, stream *s, streamID *start, streamID *end, size_t count); +void streamIteratorStart(streamIterator *si, stream *s, streamID *start, streamID *end); +int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields); +void streamIteratorGetField(streamIterator *si, unsigned char **fieldptr, unsigned char **valueptr, int64_t *fieldlen, int64_t *valuelen); +void streamIteratorStop(streamIterator *si); + #endif diff --git a/src/t_stream.c b/src/t_stream.c index de9561a5..3144adc7 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -196,25 +196,6 @@ int streamAppendItem(stream *s, robj **argv, int numfields, streamID *added_id, return C_OK; } -/* We define an iterator to iterate stream items in an abstract way, without - * caring about the radix tree + listpack representation. Technically speaking - * the iterator is only used inside streamReplyWithRange(), so could just - * be implemented inside the function, but practically there is the AOF - * rewriting code that also needs to iterate the stream to emit the XADD - * commands. */ -typedef struct streamIterator { - uint64_t start_key[2]; /* Start key as 128 bit big endian. */ - uint64_t end_key[2]; /* End key as 128 bit big endian. */ - raxIterator ri; /* Rax iterator. */ - unsigned char *lp; /* Current listpack. */ - unsigned char *lp_ele; /* Current listpack cursor. */ - /* Buffers used to hold the string of lpGet() when the element is - * integer encoded, so that there is no string representation of the - * element inside the listpack itself. */ - unsigned char field_buf[LP_INTBUF_SIZE]; - unsigned char value_buf[LP_INTBUF_SIZE]; -} streamIterator; - /* Initialize the stream iterator, so that we can call iterating functions * to get the next items. This requires a corresponding streamIteratorStop() * at the end.