mirror of
https://github.com/fluencelabs/redis
synced 2025-03-19 17:10:50 +00:00
Streams: export iteration API.
This commit is contained in:
parent
9ed40f0fc3
commit
01ea018c40
@ -1425,11 +1425,6 @@ void listTypeConvert(robj *subject, int enc);
|
|||||||
void unblockClientWaitingData(client *c);
|
void unblockClientWaitingData(client *c);
|
||||||
void popGenericCommand(client *c, int where);
|
void popGenericCommand(client *c, int where);
|
||||||
|
|
||||||
/* Stream data type. */
|
|
||||||
stream *streamNew(void);
|
|
||||||
void freeStream(stream *s);
|
|
||||||
size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count);
|
|
||||||
|
|
||||||
/* MULTI/EXEC/WATCH... */
|
/* MULTI/EXEC/WATCH... */
|
||||||
void unwatchAllKeys(client *c);
|
void unwatchAllKeys(client *c);
|
||||||
void initClientMultiState(client *c);
|
void initClientMultiState(client *c);
|
||||||
|
31
src/stream.h
31
src/stream.h
@ -19,4 +19,35 @@ typedef struct stream {
|
|||||||
streamID last_id; /* Zero if there are yet no items. */
|
streamID last_id; /* Zero if there are yet no items. */
|
||||||
} stream;
|
} stream;
|
||||||
|
|
||||||
|
/* We define an iterator to iterate stream items in an abstract way, without
|
||||||
|
* caring about the radix tree + listpack representation. Technically speaking
|
||||||
|
* the iterator is only used inside streamReplyWithRange(), so could just
|
||||||
|
* be implemented inside the function, but practically there is the AOF
|
||||||
|
* rewriting code that also needs to iterate the stream to emit the XADD
|
||||||
|
* commands. */
|
||||||
|
typedef struct streamIterator {
|
||||||
|
uint64_t start_key[2]; /* Start key as 128 bit big endian. */
|
||||||
|
uint64_t end_key[2]; /* End key as 128 bit big endian. */
|
||||||
|
raxIterator ri; /* Rax iterator. */
|
||||||
|
unsigned char *lp; /* Current listpack. */
|
||||||
|
unsigned char *lp_ele; /* Current listpack cursor. */
|
||||||
|
/* Buffers used to hold the string of lpGet() when the element is
|
||||||
|
* integer encoded, so that there is no string representation of the
|
||||||
|
* element inside the listpack itself. */
|
||||||
|
unsigned char field_buf[LP_INTBUF_SIZE];
|
||||||
|
unsigned char value_buf[LP_INTBUF_SIZE];
|
||||||
|
} streamIterator;
|
||||||
|
|
||||||
|
/* Prototypes of exported APIs. */
|
||||||
|
|
||||||
|
struct client;
|
||||||
|
|
||||||
|
stream *streamNew(void);
|
||||||
|
void freeStream(stream *s);
|
||||||
|
size_t streamReplyWithRange(struct client *c, stream *s, streamID *start, streamID *end, size_t count);
|
||||||
|
void streamIteratorStart(streamIterator *si, stream *s, streamID *start, streamID *end);
|
||||||
|
int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields);
|
||||||
|
void streamIteratorGetField(streamIterator *si, unsigned char **fieldptr, unsigned char **valueptr, int64_t *fieldlen, int64_t *valuelen);
|
||||||
|
void streamIteratorStop(streamIterator *si);
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
@ -196,25 +196,6 @@ int streamAppendItem(stream *s, robj **argv, int numfields, streamID *added_id,
|
|||||||
return C_OK;
|
return C_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* We define an iterator to iterate stream items in an abstract way, without
|
|
||||||
* caring about the radix tree + listpack representation. Technically speaking
|
|
||||||
* the iterator is only used inside streamReplyWithRange(), so could just
|
|
||||||
* be implemented inside the function, but practically there is the AOF
|
|
||||||
* rewriting code that also needs to iterate the stream to emit the XADD
|
|
||||||
* commands. */
|
|
||||||
typedef struct streamIterator {
|
|
||||||
uint64_t start_key[2]; /* Start key as 128 bit big endian. */
|
|
||||||
uint64_t end_key[2]; /* End key as 128 bit big endian. */
|
|
||||||
raxIterator ri; /* Rax iterator. */
|
|
||||||
unsigned char *lp; /* Current listpack. */
|
|
||||||
unsigned char *lp_ele; /* Current listpack cursor. */
|
|
||||||
/* Buffers used to hold the string of lpGet() when the element is
|
|
||||||
* integer encoded, so that there is no string representation of the
|
|
||||||
* element inside the listpack itself. */
|
|
||||||
unsigned char field_buf[LP_INTBUF_SIZE];
|
|
||||||
unsigned char value_buf[LP_INTBUF_SIZE];
|
|
||||||
} streamIterator;
|
|
||||||
|
|
||||||
/* Initialize the stream iterator, so that we can call iterating functions
|
/* Initialize the stream iterator, so that we can call iterating functions
|
||||||
* to get the next items. This requires a corresponding streamIteratorStop()
|
* to get the next items. This requires a corresponding streamIteratorStop()
|
||||||
* at the end.
|
* at the end.
|
||||||
|
Loading…
x
Reference in New Issue
Block a user