mirror of
https://github.com/fluencelabs/redis
synced 2025-03-18 16:40:50 +00:00
Streams: propagate specified MAXLEN instead of approximated
Slaves and rebooting redis may have different radix tree struct, by different stream* config options. So propagating approximated MAXLEN to AOF/slaves may lead to date inconsistency.
This commit is contained in:
parent
14d6318b32
commit
9042d1c249
@ -1114,8 +1114,7 @@ int streamParseStrictIDOrReply(client *c, robj *o, streamID *id, uint64_t missin
|
||||
return streamGenericParseIDOrReply(c,o,id,missing_seq,1);
|
||||
}
|
||||
|
||||
|
||||
/* XADD key [MAXLEN <count>] <ID or *> [field value] [field value] ... */
|
||||
/* XADD key [MAXLEN [~|=] <count>] <ID or *> [field value] [field value] ... */
|
||||
void xaddCommand(client *c) {
|
||||
streamID id;
|
||||
int id_given = 0; /* Was an ID different than "*" specified? */
|
||||
@ -1141,6 +1140,8 @@ void xaddCommand(client *c) {
|
||||
if (moreargs >= 2 && next[0] == '~' && next[1] == '\0') {
|
||||
approx_maxlen = 1;
|
||||
i++;
|
||||
} else if (moreargs >= 2 && next[0] == '=' && next[1] == '\0') {
|
||||
i++;
|
||||
}
|
||||
if (getLongLongFromObjectOrReply(c,c->argv[i+1],&maxlen,NULL)
|
||||
!= C_OK) return;
|
||||
@ -1187,9 +1188,22 @@ void xaddCommand(client *c) {
|
||||
notifyKeyspaceEvent(NOTIFY_STREAM,"xadd",c->argv[1],c->db->id);
|
||||
server.dirty++;
|
||||
|
||||
/* Notify xtrim event if needed. */
|
||||
if (maxlen >= 0 && streamTrimByLength(s,maxlen,approx_maxlen)) {
|
||||
notifyKeyspaceEvent(NOTIFY_STREAM,"xtrim",c->argv[1],c->db->id);
|
||||
if (maxlen >= 0) {
|
||||
/* Notify xtrim event if needed. */
|
||||
if (streamTrimByLength(s,maxlen,approx_maxlen)) {
|
||||
notifyKeyspaceEvent(NOTIFY_STREAM,"xtrim",c->argv[1],c->db->id);
|
||||
}
|
||||
|
||||
/* Rewrite approximated MAXLEN as specified s->length. */
|
||||
if (approx_maxlen) {
|
||||
robj *maxlen_obj = createStringObjectFromLongLong(s->length);
|
||||
rewriteClientCommandArgument(c,maxlen_arg_idx,maxlen_obj);
|
||||
decrRefCount(maxlen_obj);
|
||||
|
||||
robj *equal_obj = createStringObject("=",1);
|
||||
rewriteClientCommandArgument(c,maxlen_arg_idx-1,equal_obj);
|
||||
decrRefCount(equal_obj);
|
||||
}
|
||||
}
|
||||
|
||||
/* Let's rewrite the ID argument with the one actually generated for
|
||||
@ -2174,7 +2188,7 @@ void xdelCommand(client *c) {
|
||||
*
|
||||
* List of options:
|
||||
*
|
||||
* MAXLEN [~] <count> -- Trim so that the stream will be capped at
|
||||
* MAXLEN [~|=] <count> -- Trim so that the stream will be capped at
|
||||
* the specified length. Use ~ before the
|
||||
* count in order to demand approximated trimming
|
||||
* (like XADD MAXLEN option).
|
||||
@ -2196,6 +2210,7 @@ void xtrimCommand(client *c) {
|
||||
long long maxlen = -1; /* If left to -1 no trimming is performed. */
|
||||
int approx_maxlen = 0; /* If 1 only delete whole radix tree nodes, so
|
||||
the maxium length is not applied verbatim. */
|
||||
int maxlen_arg_idx = 0; /* Index of the count in MAXLEN, for rewriting. */
|
||||
|
||||
/* Parse options. */
|
||||
int i = 2; /* Start of options. */
|
||||
@ -2210,6 +2225,8 @@ void xtrimCommand(client *c) {
|
||||
if (moreargs >= 2 && next[0] == '~' && next[1] == '\0') {
|
||||
approx_maxlen = 1;
|
||||
i++;
|
||||
} else if (moreargs >= 2 && next[0] == '=' && next[1] == '\0') {
|
||||
i++;
|
||||
}
|
||||
if (getLongLongFromObjectOrReply(c,c->argv[i+1],&maxlen,NULL)
|
||||
!= C_OK) return;
|
||||
@ -2219,6 +2236,7 @@ void xtrimCommand(client *c) {
|
||||
return;
|
||||
}
|
||||
i++;
|
||||
maxlen_arg_idx = i;
|
||||
} else {
|
||||
addReply(c,shared.syntaxerr);
|
||||
return;
|
||||
@ -2239,6 +2257,17 @@ void xtrimCommand(client *c) {
|
||||
signalModifiedKey(c->db,c->argv[1]);
|
||||
notifyKeyspaceEvent(NOTIFY_STREAM,"xtrim",c->argv[1],c->db->id);
|
||||
server.dirty += deleted;
|
||||
|
||||
/* Rewrite approximated MAXLEN as specified s->length. */
|
||||
if (approx_maxlen) {
|
||||
robj *maxlen_obj = createStringObjectFromLongLong(s->length);
|
||||
rewriteClientCommandArgument(c,maxlen_arg_idx,maxlen_obj);
|
||||
decrRefCount(maxlen_obj);
|
||||
|
||||
robj *equal_obj = createStringObject("=",1);
|
||||
rewriteClientCommandArgument(c,maxlen_arg_idx-1,equal_obj);
|
||||
decrRefCount(equal_obj);
|
||||
}
|
||||
}
|
||||
addReplyLongLong(c,deleted);
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user