Skip to main content

redis/commands/
streams.rs

1//! Defines types to use with the streams commands.
2
3#[cfg(feature = "streams")]
4use crate::{
5    FromRedisValue, RedisWrite, ToRedisArgs, Value,
6    errors::{ParsingError, invalid_type_error},
7    types::HashMap,
8};
9use crate::{from_redis_value, from_redis_value_ref, types::ToSingleRedisArg};
10
11// Stream Maxlen Enum
12
13/// Utility enum for passing `MAXLEN [= or ~] [COUNT]`
14/// arguments into `StreamCommands`.
15/// The enum value represents the count.
16#[derive(PartialEq, Eq, Clone, Debug, Copy)]
17#[non_exhaustive]
18pub enum StreamMaxlen {
19    /// Match an exact count
20    Equals(usize),
21    /// Match an approximate count
22    Approx(usize),
23}
24
25impl ToRedisArgs for StreamMaxlen {
26    fn write_redis_args<W>(&self, out: &mut W)
27    where
28        W: ?Sized + RedisWrite,
29    {
30        let (ch, val) = match *self {
31            StreamMaxlen::Equals(v) => ("=", v),
32            StreamMaxlen::Approx(v) => ("~", v),
33        };
34        out.write_arg(b"MAXLEN");
35        out.write_arg(ch.as_bytes());
36        val.write_redis_args(out);
37    }
38}
39
40/// Utility enum for passing the trim mode`[=|~]`
41/// arguments into `StreamCommands`.
42#[derive(Debug)]
43#[non_exhaustive]
44pub enum StreamTrimmingMode {
45    /// Match an exact count
46    Exact,
47    /// Match an approximate count
48    Approx,
49}
50
51impl ToRedisArgs for StreamTrimmingMode {
52    fn write_redis_args<W>(&self, out: &mut W)
53    where
54        W: ?Sized + RedisWrite,
55    {
56        match self {
57            Self::Exact => out.write_arg(b"="),
58            Self::Approx => out.write_arg(b"~"),
59        };
60    }
61}
62
63/// Utility enum for passing `<MAXLEN|MINID> [=|~] threshold [LIMIT count]`
64/// arguments into `StreamCommands`.
65/// The enum values the trimming mode (=|~), the threshold, and the optional limit
66#[derive(Debug)]
67#[non_exhaustive]
68pub enum StreamTrimStrategy {
69    /// Evicts entries as long as the streams length exceeds threshold.  With an optional limit.
70    MaxLen(StreamTrimmingMode, usize, Option<usize>),
71    /// Evicts entries with IDs lower than threshold, where threshold is a stream ID With an optional limit.
72    MinId(StreamTrimmingMode, String, Option<usize>),
73}
74
75impl StreamTrimStrategy {
76    /// Define a MAXLEN trim strategy with the given maximum number of entries
77    pub fn maxlen(trim: StreamTrimmingMode, max_entries: usize) -> Self {
78        Self::MaxLen(trim, max_entries, None)
79    }
80
81    /// Defines a MINID trim strategy with the given minimum stream ID
82    pub fn minid(trim: StreamTrimmingMode, stream_id: impl Into<String>) -> Self {
83        Self::MinId(trim, stream_id.into(), None)
84    }
85
86    /// Set a limit to the number of records to trim in a single operation
87    pub fn limit(self, limit: usize) -> Self {
88        match self {
89            StreamTrimStrategy::MaxLen(m, t, _) => StreamTrimStrategy::MaxLen(m, t, Some(limit)),
90            StreamTrimStrategy::MinId(m, t, _) => StreamTrimStrategy::MinId(m, t, Some(limit)),
91        }
92    }
93}
94
95impl ToRedisArgs for StreamTrimStrategy {
96    fn write_redis_args<W>(&self, out: &mut W)
97    where
98        W: ?Sized + RedisWrite,
99    {
100        let limit = match self {
101            StreamTrimStrategy::MaxLen(m, t, limit) => {
102                out.write_arg(b"MAXLEN");
103                m.write_redis_args(out);
104                t.write_redis_args(out);
105                limit
106            }
107            StreamTrimStrategy::MinId(m, t, limit) => {
108                out.write_arg(b"MINID");
109                m.write_redis_args(out);
110                t.write_redis_args(out);
111                limit
112            }
113        };
114        if let Some(limit) = limit {
115            out.write_arg(b"LIMIT");
116            limit.write_redis_args(out);
117        }
118    }
119}
120
121/// Builder options for [`xtrim_options`] command
122///
123/// [`xtrim_options`]: ../trait.Commands.html#method.xtrim_options
124///
125#[derive(Debug)]
126pub struct StreamTrimOptions {
127    strategy: StreamTrimStrategy,
128    deletion_policy: Option<StreamDeletionPolicy>,
129}
130
131impl StreamTrimOptions {
132    /// Define a MAXLEN trim strategy with the given maximum number of entries
133    pub fn maxlen(mode: StreamTrimmingMode, max_entries: usize) -> Self {
134        Self {
135            strategy: StreamTrimStrategy::maxlen(mode, max_entries),
136            deletion_policy: None,
137        }
138    }
139
140    /// Defines a MINID trim strategy with the given minimum stream ID
141    pub fn minid(mode: StreamTrimmingMode, stream_id: impl Into<String>) -> Self {
142        Self {
143            strategy: StreamTrimStrategy::minid(mode, stream_id),
144            deletion_policy: None,
145        }
146    }
147
148    /// Set a limit to the number of records to trim in a single operation
149    pub fn limit(mut self, limit: usize) -> Self {
150        self.strategy = self.strategy.limit(limit);
151        self
152    }
153
154    /// Set the deletion policy for the XTRIM operation
155    pub fn set_deletion_policy(mut self, deletion_policy: StreamDeletionPolicy) -> Self {
156        self.deletion_policy = Some(deletion_policy);
157        self
158    }
159}
160
161impl ToRedisArgs for StreamTrimOptions {
162    fn write_redis_args<W>(&self, out: &mut W)
163    where
164        W: ?Sized + RedisWrite,
165    {
166        self.strategy.write_redis_args(out);
167        if let Some(deletion_policy) = self.deletion_policy.as_ref() {
168            deletion_policy.write_redis_args(out);
169        }
170    }
171}
172
173/// Builder options for [`xadd_options`] command
174///
175/// [`xadd_options`]: ../trait.Commands.html#method.xadd_options
176///
177#[derive(Default, Debug)]
178pub struct StreamAddOptions {
179    nomkstream: bool,
180    trim: Option<StreamTrimStrategy>,
181    deletion_policy: Option<StreamDeletionPolicy>,
182}
183
184impl StreamAddOptions {
185    /// Set the NOMKSTREAM flag on which prevents creating a stream for the XADD operation
186    pub fn nomkstream(mut self) -> Self {
187        self.nomkstream = true;
188        self
189    }
190
191    /// Enable trimming when adding using the given trim strategy
192    pub fn trim(mut self, trim: StreamTrimStrategy) -> Self {
193        self.trim = Some(trim);
194        self
195    }
196
197    /// Set the deletion policy for the XADD operation
198    pub fn set_deletion_policy(mut self, deletion_policy: StreamDeletionPolicy) -> Self {
199        self.deletion_policy = Some(deletion_policy);
200        self
201    }
202}
203
204impl ToRedisArgs for StreamAddOptions {
205    fn write_redis_args<W>(&self, out: &mut W)
206    where
207        W: ?Sized + RedisWrite,
208    {
209        if self.nomkstream {
210            out.write_arg(b"NOMKSTREAM");
211        }
212        if let Some(strategy) = self.trim.as_ref() {
213            strategy.write_redis_args(out);
214        }
215        if let Some(deletion_policy) = self.deletion_policy.as_ref() {
216            deletion_policy.write_redis_args(out);
217        }
218    }
219}
220
221/// Builder options for [`xautoclaim_options`] command.
222///
223/// [`xautoclaim_options`]: ../trait.Commands.html#method.xautoclaim_options
224///
225#[derive(Default, Debug)]
226pub struct StreamAutoClaimOptions {
227    count: Option<usize>,
228    justid: bool,
229}
230
231impl StreamAutoClaimOptions {
232    /// Sets the maximum number of elements to claim per stream.
233    pub fn count(mut self, n: usize) -> Self {
234        self.count = Some(n);
235        self
236    }
237
238    /// Set `JUSTID` cmd arg to true. Be advised: the response
239    /// type changes with this option.
240    pub fn with_justid(mut self) -> Self {
241        self.justid = true;
242        self
243    }
244}
245
246impl ToRedisArgs for StreamAutoClaimOptions {
247    fn write_redis_args<W>(&self, out: &mut W)
248    where
249        W: ?Sized + RedisWrite,
250    {
251        if let Some(ref count) = self.count {
252            out.write_arg(b"COUNT");
253            out.write_arg(format!("{count}").as_bytes());
254        }
255        if self.justid {
256            out.write_arg(b"JUSTID");
257        }
258    }
259}
260
261/// Builder options for [`xclaim_options`] command.
262///
263/// [`xclaim_options`]: ../trait.Commands.html#method.xclaim_options
264///
265#[derive(Default, Debug)]
266pub struct StreamClaimOptions {
267    /// Set `IDLE <milliseconds>` cmd arg.
268    idle: Option<usize>,
269    /// Set `TIME <Unix epoch milliseconds>` cmd arg.
270    time: Option<usize>,
271    /// Set `RETRYCOUNT <count>` cmd arg.
272    retry: Option<usize>,
273    /// Set `FORCE` cmd arg.
274    force: bool,
275    /// Set `JUSTID` cmd arg. Be advised: the response
276    /// type changes with this option.
277    justid: bool,
278    /// Set `LASTID <lastid>` cmd arg.
279    lastid: Option<String>,
280}
281
282impl StreamClaimOptions {
283    /// Set `IDLE <milliseconds>` cmd arg.
284    pub fn idle(mut self, ms: usize) -> Self {
285        self.idle = Some(ms);
286        self
287    }
288
289    /// Set `TIME <Unix epoch milliseconds>` cmd arg.
290    pub fn time(mut self, ms_time: usize) -> Self {
291        self.time = Some(ms_time);
292        self
293    }
294
295    /// Set `RETRYCOUNT <count>` cmd arg.
296    pub fn retry(mut self, count: usize) -> Self {
297        self.retry = Some(count);
298        self
299    }
300
301    /// Set `FORCE` cmd arg to true.
302    pub fn with_force(mut self) -> Self {
303        self.force = true;
304        self
305    }
306
307    /// Set `JUSTID` cmd arg to true. Be advised: the response
308    /// type changes with this option.
309    pub fn with_justid(mut self) -> Self {
310        self.justid = true;
311        self
312    }
313
314    /// Set `LASTID <lastid>` cmd arg.
315    pub fn with_lastid(mut self, lastid: impl Into<String>) -> Self {
316        self.lastid = Some(lastid.into());
317        self
318    }
319}
320
321impl ToRedisArgs for StreamClaimOptions {
322    fn write_redis_args<W>(&self, out: &mut W)
323    where
324        W: ?Sized + RedisWrite,
325    {
326        if let Some(ref ms) = self.idle {
327            out.write_arg(b"IDLE");
328            out.write_arg(format!("{ms}").as_bytes());
329        }
330        if let Some(ref ms_time) = self.time {
331            out.write_arg(b"TIME");
332            out.write_arg(format!("{ms_time}").as_bytes());
333        }
334        if let Some(ref count) = self.retry {
335            out.write_arg(b"RETRYCOUNT");
336            out.write_arg(format!("{count}").as_bytes());
337        }
338        if self.force {
339            out.write_arg(b"FORCE");
340        }
341        if self.justid {
342            out.write_arg(b"JUSTID");
343        }
344        if let Some(ref lastid) = self.lastid {
345            out.write_arg(b"LASTID");
346            lastid.write_redis_args(out);
347        }
348    }
349}
350
351/// Argument to `StreamReadOptions`
352/// Represents the Redis `GROUP <groupname> <consumername>` cmd arg.
353/// This option will toggle the cmd from `XREAD` to `XREADGROUP`
354type SRGroup = Option<(Vec<Vec<u8>>, Vec<Vec<u8>>)>;
355/// Builder options for [`xread_options`] command.
356///
357/// [`xread_options`]: ../trait.Commands.html#method.xread_options
358///
359#[derive(Default, Debug)]
360pub struct StreamReadOptions {
361    /// Set the `BLOCK <milliseconds>` cmd arg.
362    block: Option<usize>,
363    /// Set the `COUNT <count>` cmd arg.
364    count: Option<usize>,
365    /// Set the `NOACK` cmd arg.
366    noack: Option<bool>,
367    /// Set the `GROUP <groupname> <consumername>` cmd arg.
368    /// This option will toggle the cmd from XREAD to XREADGROUP.
369    group: SRGroup,
370    /// Set the `CLAIM <min-idle-time>` cmd arg.
371    /// The `<min-idle-time>` is specified in milliseconds.
372    claim: Option<usize>,
373}
374
375impl StreamReadOptions {
376    /// Indicates whether the command is participating in a group
377    /// and generating ACKs
378    pub fn read_only(&self) -> bool {
379        self.group.is_none()
380    }
381
382    /// Sets the command so that it avoids adding the message
383    /// to the PEL in cases where reliability is not a requirement
384    /// and the occasional message loss is acceptable.
385    pub fn noack(mut self) -> Self {
386        self.noack = Some(true);
387        self
388    }
389
390    /// Sets the block time in milliseconds.
391    pub fn block(mut self, ms: usize) -> Self {
392        self.block = Some(ms);
393        self
394    }
395
396    /// Sets the maximum number of elements to return per stream.
397    pub fn count(mut self, n: usize) -> Self {
398        self.count = Some(n);
399        self
400    }
401
402    /// Sets the name of a consumer group associated to the stream.
403    pub fn group<GN: ToRedisArgs, CN: ToRedisArgs>(
404        mut self,
405        group_name: GN,
406        consumer_name: CN,
407    ) -> Self {
408        self.group = Some((
409            ToRedisArgs::to_redis_args(&group_name),
410            ToRedisArgs::to_redis_args(&consumer_name),
411        ));
412        self
413    }
414
415    /// Set the minimum idle time for the CLAIM parameter.
416    pub fn claim(mut self, min_idle_time: usize) -> Self {
417        self.claim = Some(min_idle_time);
418        self
419    }
420}
421
422impl ToRedisArgs for StreamReadOptions {
423    fn write_redis_args<W>(&self, out: &mut W)
424    where
425        W: ?Sized + RedisWrite,
426    {
427        if let Some(ref group) = self.group {
428            out.write_arg(b"GROUP");
429            for i in &group.0 {
430                out.write_arg(i);
431            }
432            for i in &group.1 {
433                out.write_arg(i);
434            }
435        }
436
437        if let Some(ref ms) = self.block {
438            out.write_arg(b"BLOCK");
439            out.write_arg(format!("{ms}").as_bytes());
440        }
441
442        if let Some(ref n) = self.count {
443            out.write_arg(b"COUNT");
444            out.write_arg(format!("{n}").as_bytes());
445        }
446
447        if self.group.is_some() {
448            // noack is only available w/ xreadgroup
449            if self.noack == Some(true) {
450                out.write_arg(b"NOACK");
451            }
452            // claim is only available w/ xreadgroup
453            if let Some(ref min_idle_time) = self.claim {
454                out.write_arg(b"CLAIM");
455                out.write_arg(format!("{min_idle_time}").as_bytes());
456            }
457        }
458    }
459}
460
461/// Reply type used with the [`xautoclaim_options`] command.
462///
463/// [`xautoclaim_options`]: ../trait.Commands.html#method.xautoclaim_options
464///
465#[derive(Default, Debug, Clone)]
466pub struct StreamAutoClaimReply {
467    /// The next stream id to use as the start argument for the next xautoclaim
468    pub next_stream_id: String,
469    /// The entries claimed for the consumer. When JUSTID is enabled the map in each entry is blank
470    pub claimed: Vec<StreamId>,
471    /// The list of stream ids that were removed due to no longer being in the stream
472    pub deleted_ids: Vec<String>,
473    /// If set, this means that the reply contained invalid nil entries, that were skipped during parsing.
474    ///
475    /// This should only happen when using Redis 6, see <https://github.com/redis-rs/redis-rs/issues/1798>
476    pub invalid_entries: bool,
477}
478
479/// Reply type used with [`xread`] or [`xread_options`] commands.
480///
481/// [`xread`]: ../trait.Commands.html#method.xread
482/// [`xread_options`]: ../trait.Commands.html#method.xread_options
483///
484#[derive(Default, Debug, Clone)]
485pub struct StreamReadReply {
486    /// Complex data structure containing a payload for each key in this array
487    pub keys: Vec<StreamKey>,
488}
489
490/// Reply type used with [`xrange`], [`xrange_count`], [`xrange_all`], [`xrevrange`], [`xrevrange_count`], [`xrevrange_all`] commands.
491///
492/// Represents stream entries matching a given range of `id`'s.
493///
494/// [`xrange`]: ../trait.Commands.html#method.xrange
495/// [`xrange_count`]: ../trait.Commands.html#method.xrange_count
496/// [`xrange_all`]: ../trait.Commands.html#method.xrange_all
497/// [`xrevrange`]: ../trait.Commands.html#method.xrevrange
498/// [`xrevrange_count`]: ../trait.Commands.html#method.xrevrange_count
499/// [`xrevrange_all`]: ../trait.Commands.html#method.xrevrange_all
500///
501#[derive(Default, Debug, Clone)]
502pub struct StreamRangeReply {
503    /// Complex data structure containing a payload for each ID in this array
504    pub ids: Vec<StreamId>,
505}
506
507/// Reply type used with [`xclaim`] command.
508///
509/// Represents that ownership of the specified messages was changed.
510///
511/// [`xclaim`]: ../trait.Commands.html#method.xclaim
512///
513#[derive(Default, Debug, Clone)]
514pub struct StreamClaimReply {
515    /// Complex data structure containing a payload for each ID in this array
516    pub ids: Vec<StreamId>,
517}
518
519/// Reply type used with [`xpending`] command.
520///
521/// Data returned here were fetched from the stream without
522/// having been acknowledged.
523///
524/// [`xpending`]: ../trait.Commands.html#method.xpending
525///
526#[derive(Debug, Clone, Default)]
527#[non_exhaustive]
528pub enum StreamPendingReply {
529    /// The stream is empty.
530    #[default]
531    Empty,
532    /// Data with payload exists in the stream.
533    Data(StreamPendingData),
534}
535
536impl StreamPendingReply {
537    /// Returns how many records are in the reply.
538    pub fn count(&self) -> usize {
539        match self {
540            StreamPendingReply::Empty => 0,
541            StreamPendingReply::Data(x) => x.count,
542        }
543    }
544}
545
546/// Inner reply type when an [`xpending`] command has data.
547///
548/// [`xpending`]: ../trait.Commands.html#method.xpending
549#[derive(Default, Debug, Clone)]
550pub struct StreamPendingData {
551    /// Limit on the number of messages to return per call.
552    pub count: usize,
553    /// ID for the first pending record.
554    pub start_id: String,
555    /// ID for the final pending record.
556    pub end_id: String,
557    /// Every consumer in the consumer group with at
558    /// least one pending message,
559    /// and the number of pending messages it has.
560    pub consumers: Vec<StreamInfoConsumer>,
561}
562
563/// Reply type used with [`xpending_count`] and
564/// [`xpending_consumer_count`] commands.
565///
566/// Data returned here have been fetched from the stream without
567/// any acknowledgement.
568///
569/// [`xpending_count`]: ../trait.Commands.html#method.xpending_count
570/// [`xpending_consumer_count`]: ../trait.Commands.html#method.xpending_consumer_count
571///
572#[derive(Default, Debug, Clone)]
573pub struct StreamPendingCountReply {
574    /// An array of structs containing information about
575    /// message IDs yet to be acknowledged by various consumers,
576    /// time since last ack, and total number of acks by that consumer.
577    pub ids: Vec<StreamPendingId>,
578}
579
580/// Reply type used with [`xinfo_stream`] command, containing
581/// general information about the stream stored at the specified key.
582///
583/// The very first and last IDs in the stream are shown,
584/// in order to give some sense about what is the stream content.
585///
586/// [`xinfo_stream`]: ../trait.Commands.html#method.xinfo_stream
587///
588#[derive(Default, Debug, Clone)]
589pub struct StreamInfoStreamReply {
590    /// The last generated ID that may not be the same as the last
591    /// entry ID in case some entry was deleted.
592    pub last_generated_id: String,
593    /// Details about the radix tree representing the stream mostly
594    /// useful for optimization and debugging tasks.
595    pub radix_tree_keys: usize,
596    /// The number of consumer groups associated with the stream.
597    pub groups: usize,
598    /// Number of elements of the stream.
599    pub length: usize,
600    /// The very first entry in the stream.
601    pub first_entry: StreamId,
602    /// The very last entry in the stream.
603    pub last_entry: StreamId,
604}
605
606/// Reply type used with [`xinfo_consumer`] command, an array of every
607/// consumer in a specific consumer group.
608///
609/// [`xinfo_consumer`]: ../trait.Commands.html#method.xinfo_consumer
610///
611#[derive(Default, Debug, Clone)]
612pub struct StreamInfoConsumersReply {
613    /// An array of every consumer in a specific consumer group.
614    pub consumers: Vec<StreamInfoConsumer>,
615}
616
617/// Reply type used with [`xinfo_groups`] command.
618///
619/// This output represents all the consumer groups associated with
620/// the stream.
621///
622/// [`xinfo_groups`]: ../trait.Commands.html#method.xinfo_groups
623///
624#[derive(Default, Debug, Clone)]
625pub struct StreamInfoGroupsReply {
626    /// All the consumer groups associated with the stream.
627    pub groups: Vec<StreamInfoGroup>,
628}
629
630/// A consumer parsed from [`xinfo_consumers`] command.
631///
632/// [`xinfo_consumers`]: ../trait.Commands.html#method.xinfo_consumers
633///
634#[derive(Default, Debug, Clone)]
635pub struct StreamInfoConsumer {
636    /// Name of the consumer group.
637    pub name: String,
638    /// Number of pending messages for this specific consumer.
639    pub pending: usize,
640    /// This consumer's idle time in milliseconds.
641    pub idle: usize,
642}
643
644/// A group parsed from [`xinfo_groups`] command.
645///
646/// [`xinfo_groups`]: ../trait.Commands.html#method.xinfo_groups
647///
648#[derive(Default, Debug, Clone)]
649pub struct StreamInfoGroup {
650    /// The group name.
651    pub name: String,
652    /// Number of consumers known in the group.
653    pub consumers: usize,
654    /// Number of pending messages (delivered but not yet acknowledged) in the group.
655    pub pending: usize,
656    /// Last ID delivered to this group.
657    pub last_delivered_id: String,
658    /// The logical "read counter" of the last entry delivered to group's consumers
659    /// (or `None` if the server does not provide the value).
660    pub entries_read: Option<usize>,
661    /// The number of entries in the stream that are still waiting to be delivered to the
662    /// group's consumers, or a `None` when that number can't be determined.
663    pub lag: Option<usize>,
664}
665
666/// Represents a pending message parsed from [`xpending`] methods.
667///
668/// [`xpending`]: ../trait.Commands.html#method.xpending
669#[derive(Default, Debug, Clone)]
670pub struct StreamPendingId {
671    /// The ID of the message.
672    pub id: String,
673    /// The name of the consumer that fetched the message and has
674    /// still to acknowledge it. We call it the current owner
675    /// of the message.
676    pub consumer: String,
677    /// The number of milliseconds that elapsed since the
678    /// last time this message was delivered to this consumer.
679    pub last_delivered_ms: usize,
680    /// The number of times this message was delivered.
681    pub times_delivered: usize,
682}
683
684/// Represents a stream `key` and its `id`'s parsed from `xread` methods.
685#[derive(Default, Debug, Clone)]
686pub struct StreamKey {
687    /// The stream `key`.
688    pub key: String,
689    /// The parsed stream `id`'s.
690    pub ids: Vec<StreamId>,
691}
692
693/// Represents a stream `id` and its field/values as a `HashMap`
694/// Also contains optional PEL information if the message was fetched with XREADGROUP with a `claim` option
695#[derive(Default, Debug, Clone, PartialEq)]
696pub struct StreamId {
697    /// The stream `id` (entry ID) of this particular message.
698    pub id: String,
699    /// All fields in this message, associated with their respective values.
700    pub map: HashMap<String, Value>,
701    /// The number of milliseconds that elapsed since the last time this entry was delivered to a consumer.
702    pub milliseconds_elapsed_from_delivery: Option<usize>,
703    /// The number of times this entry was delivered.
704    pub delivered_count: Option<usize>,
705}
706
707impl StreamId {
708    /// Converts a `Value::Array` into a `StreamId`.
709    fn from_array_value(v: Value) -> Result<Self, ParsingError> {
710        let mut stream_id = StreamId::default();
711        if let Value::Array(mut values) = v {
712            if let Some(v) = values.first_mut() {
713                stream_id.id = from_redis_value(std::mem::take(v))?;
714            }
715            if let Some(v) = values.first_mut() {
716                stream_id.map = from_redis_value(std::mem::take(v))?;
717            }
718        }
719
720        Ok(stream_id)
721    }
722
723    /// Fetches value of a given field and converts it to the specified
724    /// type.
725    pub fn get<T: FromRedisValue>(&self, key: &str) -> Option<T> {
726        match self.map.get(key) {
727            Some(x) => from_redis_value_ref(x).ok(),
728            None => None,
729        }
730    }
731
732    /// Does the message contain a particular field?
733    pub fn contains_key(&self, key: &str) -> bool {
734        self.map.contains_key(key)
735    }
736
737    /// Returns how many field/value pairs exist in this message.
738    pub fn len(&self) -> usize {
739        self.map.len()
740    }
741
742    /// Returns true if there are no field/value pairs in this message.
743    pub fn is_empty(&self) -> bool {
744        self.len() == 0
745    }
746}
747
748type SACRows = Vec<HashMap<String, HashMap<String, Value>>>;
749
750impl FromRedisValue for StreamAutoClaimReply {
751    fn from_redis_value(v: Value) -> Result<Self, ParsingError> {
752        let Value::Array(mut items) = v else {
753            invalid_type_error!("Not a array response", v);
754        };
755
756        if items.len() > 3 || items.len() < 2 {
757            invalid_type_error!("Incorrect number of items", &items);
758        }
759
760        let deleted_ids = if items.len() == 3 {
761            from_redis_value(items.pop().unwrap())?
762        } else {
763            Vec::new()
764        };
765        // safe, because we've checked for length beforehand
766        let claimed = items.pop().unwrap();
767        let next_stream_id = from_redis_value(items.pop().unwrap())?;
768
769        let Value::Array(arr) = &claimed else {
770            invalid_type_error!("Incorrect type", claimed)
771        };
772        let Some(entry) = arr.iter().find(|val| !matches!(val, Value::Nil)) else {
773            return Ok(Self {
774                next_stream_id,
775                claimed: Vec::new(),
776                deleted_ids,
777                invalid_entries: !arr.is_empty(),
778            });
779        };
780        let (claimed, invalid_entries) = match entry {
781            Value::BulkString(_) => {
782                // JUSTID response
783                let claimed_count = arr.len();
784                let ids: Vec<Option<String>> = from_redis_value(claimed)?;
785
786                let claimed: Vec<_> = ids
787                    .into_iter()
788                    .filter_map(|id| {
789                        id.map(|id| StreamId {
790                            id,
791                            ..Default::default()
792                        })
793                    })
794                    .collect();
795                // This means that some nil entries were filtered
796                let invalid_entries = claimed.len() < claimed_count;
797                (claimed, invalid_entries)
798            }
799            Value::Array(_) => {
800                // full response
801                let claimed_count = arr.len();
802                let rows: SACRows = from_redis_value(claimed)?;
803
804                let claimed: Vec<_> = rows
805                    .into_iter()
806                    .flat_map(|row| {
807                        row.into_iter().map(|(id, map)| StreamId {
808                            id,
809                            map,
810                            milliseconds_elapsed_from_delivery: None,
811                            delivered_count: None,
812                        })
813                    })
814                    .collect();
815                // This means that some nil entries were filtered
816                let invalid_entries = claimed.len() < claimed_count;
817                (claimed, invalid_entries)
818            }
819            _ => invalid_type_error!("Incorrect type", claimed),
820        };
821
822        Ok(Self {
823            next_stream_id,
824            claimed,
825            deleted_ids,
826            invalid_entries,
827        })
828    }
829}
830
831type SRRows = Vec<HashMap<String, Vec<HashMap<String, HashMap<String, Value>>>>>;
832type SRClaimRows =
833    Vec<HashMap<String, Vec<(String, HashMap<String, Value>, Option<usize>, Option<usize>)>>>;
834
835impl FromRedisValue for StreamReadReply {
836    fn from_redis_value(v: Value) -> Result<Self, ParsingError> {
837        // Try to parse as the standard format first
838        if let Ok(rows) = from_redis_value::<SRRows>(v.clone()) {
839            return Ok(Self::from_standard_rows(rows));
840        }
841
842        // If that fails, try to parse as XREADGROUP with CLAIM format
843        // Format: [[stream_name, [[id, [field, value, ...], ms_elapsed, delivery_count], ...]]]
844        if let Ok(rows) = from_redis_value::<SRClaimRows>(v.clone()) {
845            return Ok(Self::from_claim_rows(rows));
846        }
847
848        invalid_type_error!("Could not parse StreamReadReply in any known format", v)
849    }
850}
851
852impl StreamReadReply {
853    fn from_standard_rows(rows: SRRows) -> Self {
854        let keys = rows
855            .into_iter()
856            .flat_map(|row| {
857                row.into_iter().map(|(key, entries)| StreamKey {
858                    key,
859                    ids: entries
860                        .into_iter()
861                        .flat_map(|id_row| {
862                            id_row.into_iter().map(|(id, map)| StreamId {
863                                id,
864                                map,
865                                milliseconds_elapsed_from_delivery: None,
866                                delivered_count: None,
867                            })
868                        })
869                        .collect(),
870                })
871            })
872            .collect();
873        StreamReadReply { keys }
874    }
875
876    fn from_claim_rows(rows: SRClaimRows) -> Self {
877        let keys = rows
878            .into_iter()
879            .flat_map(|row| {
880                row.into_iter().map(|(key, entries)| StreamKey {
881                    key,
882                    ids: entries
883                        .into_iter()
884                        .map(
885                            |(id, map, milliseconds_elapsed_from_delivery, delivered_count)| {
886                                StreamId {
887                                    id,
888                                    map,
889                                    milliseconds_elapsed_from_delivery,
890                                    delivered_count,
891                                }
892                            },
893                        )
894                        .collect(),
895                })
896            })
897            .collect();
898        StreamReadReply { keys }
899    }
900}
901
902impl FromRedisValue for StreamRangeReply {
903    fn from_redis_value(v: Value) -> Result<Self, ParsingError> {
904        let rows: Vec<HashMap<String, HashMap<String, Value>>> = from_redis_value(v)?;
905        let ids: Vec<StreamId> = rows
906            .into_iter()
907            .flat_map(|row| {
908                row.into_iter().map(|(id, map)| StreamId {
909                    id,
910                    map,
911                    milliseconds_elapsed_from_delivery: None,
912                    delivered_count: None,
913                })
914            })
915            .collect();
916        Ok(StreamRangeReply { ids })
917    }
918}
919
920impl FromRedisValue for StreamClaimReply {
921    fn from_redis_value(v: Value) -> Result<Self, ParsingError> {
922        let rows: Vec<HashMap<String, HashMap<String, Value>>> = from_redis_value(v)?;
923        let ids: Vec<StreamId> = rows
924            .into_iter()
925            .flat_map(|row| {
926                row.into_iter().map(|(id, map)| StreamId {
927                    id,
928                    map,
929                    milliseconds_elapsed_from_delivery: None,
930                    delivered_count: None,
931                })
932            })
933            .collect();
934        Ok(StreamClaimReply { ids })
935    }
936}
937
938type SPRInner = (
939    usize,
940    Option<String>,
941    Option<String>,
942    Vec<Option<(String, String)>>,
943);
944impl FromRedisValue for StreamPendingReply {
945    fn from_redis_value(v: Value) -> Result<Self, ParsingError> {
946        let (count, start, end, consumer_data): SPRInner = from_redis_value(v)?;
947
948        if count == 0 {
949            Ok(StreamPendingReply::Empty)
950        } else {
951            let mut result = StreamPendingData::default();
952
953            let start_id = start.ok_or_else(|| {
954                ParsingError::from(arcstr::literal!(
955                    "IllegalState: Non-zero pending expects start id"
956                ))
957            })?;
958
959            let end_id = end.ok_or_else(|| {
960                ParsingError::from(arcstr::literal!(
961                    "IllegalState: Non-zero pending expects end id"
962                ))
963            })?;
964
965            result.count = count;
966            result.start_id = start_id;
967            result.end_id = end_id;
968
969            result.consumers = consumer_data
970                .into_iter()
971                .flatten()
972                .map(|(name, pending)| StreamInfoConsumer {
973                    name,
974                    pending: pending.parse().unwrap_or_default(),
975                    ..Default::default()
976                })
977                .collect();
978
979            Ok(StreamPendingReply::Data(result))
980        }
981    }
982}
983
984impl FromRedisValue for StreamPendingCountReply {
985    fn from_redis_value(v: Value) -> Result<Self, ParsingError> {
986        let mut reply = StreamPendingCountReply::default();
987        match v {
988            Value::Array(outer_tuple) => {
989                for outer in outer_tuple {
990                    match outer {
991                        Value::Array(inner_tuple) => match &inner_tuple[..] {
992                            [
993                                Value::BulkString(id_bytes),
994                                Value::BulkString(consumer_bytes),
995                                Value::Int(last_delivered_ms_u64),
996                                Value::Int(times_delivered_u64),
997                            ] => {
998                                let id = String::from_utf8(id_bytes.to_vec())?;
999                                let consumer = String::from_utf8(consumer_bytes.to_vec())?;
1000                                let last_delivered_ms = *last_delivered_ms_u64 as usize;
1001                                let times_delivered = *times_delivered_u64 as usize;
1002                                reply.ids.push(StreamPendingId {
1003                                    id,
1004                                    consumer,
1005                                    last_delivered_ms,
1006                                    times_delivered,
1007                                });
1008                            }
1009                            _ => fail!(ParsingError::from(arcstr::literal!(
1010                                "Cannot parse redis data (3)"
1011                            ))),
1012                        },
1013                        _ => fail!(ParsingError::from(arcstr::literal!(
1014                            "Cannot parse redis data (2)"
1015                        ))),
1016                    }
1017                }
1018            }
1019            _ => fail!(ParsingError::from(arcstr::literal!(
1020                "Cannot parse redis data (1)"
1021            ))),
1022        };
1023        Ok(reply)
1024    }
1025}
1026
1027impl FromRedisValue for StreamInfoStreamReply {
1028    fn from_redis_value(v: Value) -> Result<Self, ParsingError> {
1029        let mut map: HashMap<String, Value> = from_redis_value(v)?;
1030        let mut reply = StreamInfoStreamReply::default();
1031        if let Some(v) = map.remove("last-generated-id") {
1032            reply.last_generated_id = from_redis_value(v)?;
1033        }
1034        if let Some(v) = map.remove("radix-tree-nodes") {
1035            reply.radix_tree_keys = from_redis_value(v)?;
1036        }
1037        if let Some(v) = map.remove("groups") {
1038            reply.groups = from_redis_value(v)?;
1039        }
1040        if let Some(v) = map.remove("length") {
1041            reply.length = from_redis_value(v)?;
1042        }
1043        if let Some(v) = map.remove("first-entry") {
1044            reply.first_entry = StreamId::from_array_value(v)?;
1045        }
1046        if let Some(v) = map.remove("last-entry") {
1047            reply.last_entry = StreamId::from_array_value(v)?;
1048        }
1049        Ok(reply)
1050    }
1051}
1052
1053impl FromRedisValue for StreamInfoConsumersReply {
1054    fn from_redis_value(v: Value) -> Result<Self, ParsingError> {
1055        let consumers: Vec<HashMap<String, Value>> = from_redis_value(v)?;
1056        let mut reply = StreamInfoConsumersReply::default();
1057        for mut map in consumers {
1058            let mut c = StreamInfoConsumer::default();
1059            if let Some(v) = map.remove("name") {
1060                c.name = from_redis_value(v)?;
1061            }
1062            if let Some(v) = map.remove("pending") {
1063                c.pending = from_redis_value(v)?;
1064            }
1065            if let Some(v) = map.remove("idle") {
1066                c.idle = from_redis_value(v)?;
1067            }
1068            reply.consumers.push(c);
1069        }
1070
1071        Ok(reply)
1072    }
1073}
1074
1075impl FromRedisValue for StreamInfoGroupsReply {
1076    fn from_redis_value(v: Value) -> Result<Self, ParsingError> {
1077        let groups: Vec<HashMap<String, Value>> = from_redis_value(v)?;
1078        let mut reply = StreamInfoGroupsReply::default();
1079        for mut map in groups {
1080            let mut g = StreamInfoGroup::default();
1081            if let Some(v) = map.remove("name") {
1082                g.name = from_redis_value(v)?;
1083            }
1084            if let Some(v) = map.remove("pending") {
1085                g.pending = from_redis_value(v)?;
1086            }
1087            if let Some(v) = map.remove("consumers") {
1088                g.consumers = from_redis_value(v)?;
1089            }
1090            if let Some(v) = map.remove("last-delivered-id") {
1091                g.last_delivered_id = from_redis_value(v)?;
1092            }
1093            if let Some(v) = map.remove("entries-read") {
1094                g.entries_read = if let Value::Nil = v {
1095                    None
1096                } else {
1097                    Some(from_redis_value(v)?)
1098                };
1099            }
1100            if let Some(v) = map.remove("lag") {
1101                g.lag = if let Value::Nil = v {
1102                    None
1103                } else {
1104                    Some(from_redis_value(v)?)
1105                };
1106            }
1107            reply.groups.push(g);
1108        }
1109        Ok(reply)
1110    }
1111}
1112
1113/// Deletion policy for stream entries.
1114#[derive(Debug, Clone, Default)]
1115#[non_exhaustive]
1116pub enum StreamDeletionPolicy {
1117    /// Preserve existing references to the deleted entries in all consumer groups' PEL.
1118    #[default]
1119    KeepRef,
1120    /// Delete the entry from the stream and from all the consumer groups' PELs.
1121    DelRef,
1122    /// Delete the entry from the stream and from all the consumer groups' PELs, but only if the entry is acknowledged by all the groups.
1123    Acked,
1124}
1125
1126impl ToRedisArgs for StreamDeletionPolicy {
1127    fn write_redis_args<W>(&self, out: &mut W)
1128    where
1129        W: ?Sized + RedisWrite,
1130    {
1131        match self {
1132            StreamDeletionPolicy::KeepRef => out.write_arg(b"KEEPREF"),
1133            StreamDeletionPolicy::DelRef => out.write_arg(b"DELREF"),
1134            StreamDeletionPolicy::Acked => out.write_arg(b"ACKED"),
1135        }
1136    }
1137}
1138impl ToSingleRedisArg for StreamDeletionPolicy {}
1139
1140/// Status codes returned by the `XDELEX` command
1141#[cfg(feature = "streams")]
1142#[cfg_attr(docsrs, doc(cfg(feature = "streams")))]
1143#[derive(Debug, PartialEq, Eq)]
1144#[non_exhaustive]
1145pub enum XDelExStatusCode {
1146    /// No entry with the given id exists in the stream
1147    IdNotFound = -1,
1148    /// The entry was deleted from the stream
1149    Deleted = 1,
1150    /// The entry was not deleted because it has either not been delivered to any consumer
1151    /// or still has references in the consumer groups' Pending Entries List (PEL)
1152    NotDeletedUnacknowledgedOrStillReferenced = 2,
1153}
1154
1155#[cfg(feature = "streams")]
1156impl FromRedisValue for XDelExStatusCode {
1157    fn from_redis_value(v: Value) -> Result<Self, ParsingError> {
1158        match v {
1159            Value::Int(code) => match code {
1160                -1 => Ok(XDelExStatusCode::IdNotFound),
1161                1 => Ok(XDelExStatusCode::Deleted),
1162                2 => Ok(XDelExStatusCode::NotDeletedUnacknowledgedOrStillReferenced),
1163                _ => Err(format!("Invalid XDelExStatusCode status code: {code}").into()),
1164            },
1165            _ => Err(arcstr::literal!("Response type not XAckDelStatusCode compatible").into()),
1166        }
1167    }
1168}
1169
1170/// Status codes returned by the `XACKDEL` command
1171#[cfg(feature = "streams")]
1172#[cfg_attr(docsrs, doc(cfg(feature = "streams")))]
1173#[derive(Debug, PartialEq, Eq)]
1174#[non_exhaustive]
1175pub enum XAckDelStatusCode {
1176    /// No entry with the given id exists in the stream
1177    IdNotFound = -1,
1178    /// The entry was acknowledged and deleted from the stream
1179    AcknowledgedAndDeleted = 1,
1180    /// The entry was acknowledged but not deleted because it has references in the consumer groups' Pending Entries List (PEL)
1181    AcknowledgedNotDeletedStillReferenced = 2,
1182}
1183
1184#[cfg(feature = "streams")]
1185impl FromRedisValue for XAckDelStatusCode {
1186    fn from_redis_value(v: Value) -> Result<Self, ParsingError> {
1187        match v {
1188            Value::Int(code) => match code {
1189                -1 => Ok(XAckDelStatusCode::IdNotFound),
1190                1 => Ok(XAckDelStatusCode::AcknowledgedAndDeleted),
1191                2 => Ok(XAckDelStatusCode::AcknowledgedNotDeletedStillReferenced),
1192                _ => Err(arcstr::literal!("Invalid XAckDelStatusCode status code: {code}").into()),
1193            },
1194            _ => Err(arcstr::literal!("Response type not XAckDelStatusCode compatible").into()),
1195        }
1196    }
1197}
1198
1199#[cfg(test)]
1200mod tests {
1201    use super::*;
1202
1203    fn assert_command_eq(object: impl ToRedisArgs, expected: &[u8]) {
1204        let mut out: Vec<Vec<u8>> = Vec::new();
1205
1206        object.write_redis_args(&mut out);
1207
1208        let mut cmd: Vec<u8> = Vec::new();
1209
1210        out.iter_mut().for_each(|item| {
1211            cmd.append(item);
1212            cmd.push(b' ');
1213        });
1214
1215        cmd.pop();
1216
1217        assert_eq!(cmd, expected);
1218    }
1219
1220    mod stream_auto_claim_reply {
1221        use super::*;
1222        use crate::Value;
1223
1224        #[test]
1225        fn short_response() {
1226            let value = Value::Array(vec![Value::BulkString("1713465536578-0".into())]);
1227
1228            StreamAutoClaimReply::from_redis_value(value).unwrap_err();
1229        }
1230
1231        #[test]
1232        fn parses_none_claimed_response() {
1233            let value = Value::Array(vec![
1234                Value::BulkString("0-0".into()),
1235                Value::Array(vec![]),
1236                Value::Array(vec![]),
1237            ]);
1238
1239            let reply: StreamAutoClaimReply = FromRedisValue::from_redis_value(value).unwrap();
1240
1241            assert_eq!(reply.next_stream_id.as_str(), "0-0");
1242            assert_eq!(reply.claimed.len(), 0);
1243            assert_eq!(reply.deleted_ids.len(), 0);
1244        }
1245
1246        #[test]
1247        fn parses_response() {
1248            let value = Value::Array(vec![
1249                Value::BulkString("1713465536578-0".into()),
1250                Value::Array(vec![
1251                    Value::Array(vec![
1252                        Value::BulkString("1713465533411-0".into()),
1253                        // Both RESP2 and RESP3 expose this map as an array of key/values
1254                        Value::Array(vec![
1255                            Value::BulkString("name".into()),
1256                            Value::BulkString("test".into()),
1257                            Value::BulkString("other".into()),
1258                            Value::BulkString("whaterver".into()),
1259                        ]),
1260                    ]),
1261                    Value::Array(vec![
1262                        Value::BulkString("1713465536069-0".into()),
1263                        Value::Array(vec![
1264                            Value::BulkString("name".into()),
1265                            Value::BulkString("another test".into()),
1266                            Value::BulkString("other".into()),
1267                            Value::BulkString("something".into()),
1268                        ]),
1269                    ]),
1270                ]),
1271                Value::Array(vec![Value::BulkString("123456789-0".into())]),
1272            ]);
1273
1274            let reply: StreamAutoClaimReply = FromRedisValue::from_redis_value(value).unwrap();
1275
1276            assert_eq!(reply.next_stream_id.as_str(), "1713465536578-0");
1277            assert_eq!(reply.claimed.len(), 2);
1278            assert_eq!(reply.claimed[0].id.as_str(), "1713465533411-0");
1279            assert!(
1280                matches!(reply.claimed[0].map.get("name"), Some(Value::BulkString(v)) if v == "test".as_bytes())
1281            );
1282            assert_eq!(reply.claimed[1].id.as_str(), "1713465536069-0");
1283            assert_eq!(reply.deleted_ids.len(), 1);
1284            assert!(reply.deleted_ids.contains(&"123456789-0".to_string()))
1285        }
1286
1287        #[test]
1288        fn parses_v6_response() {
1289            let value = Value::Array(vec![
1290                Value::BulkString("1713465536578-0".into()),
1291                Value::Array(vec![
1292                    Value::Array(vec![
1293                        Value::BulkString("1713465533411-0".into()),
1294                        Value::Array(vec![
1295                            Value::BulkString("name".into()),
1296                            Value::BulkString("test".into()),
1297                            Value::BulkString("other".into()),
1298                            Value::BulkString("whaterver".into()),
1299                        ]),
1300                    ]),
1301                    Value::Array(vec![
1302                        Value::BulkString("1713465536069-0".into()),
1303                        Value::Array(vec![
1304                            Value::BulkString("name".into()),
1305                            Value::BulkString("another test".into()),
1306                            Value::BulkString("other".into()),
1307                            Value::BulkString("something".into()),
1308                        ]),
1309                    ]),
1310                ]),
1311                // V6 and lower lack the deleted_ids array
1312            ]);
1313
1314            let reply: StreamAutoClaimReply = FromRedisValue::from_redis_value(value).unwrap();
1315
1316            assert_eq!(reply.next_stream_id.as_str(), "1713465536578-0");
1317            assert_eq!(reply.claimed.len(), 2);
1318            let ids: Vec<_> = reply.claimed.iter().map(|e| e.id.as_str()).collect();
1319            assert!(ids.contains(&"1713465533411-0"));
1320            assert!(ids.contains(&"1713465536069-0"));
1321            assert_eq!(reply.deleted_ids.len(), 0);
1322        }
1323
1324        #[test]
1325        fn parses_justid_response() {
1326            let value = Value::Array(vec![
1327                Value::BulkString("1713465536578-0".into()),
1328                Value::Array(vec![
1329                    Value::BulkString("1713465533411-0".into()),
1330                    Value::BulkString("1713465536069-0".into()),
1331                ]),
1332                Value::Array(vec![Value::BulkString("123456789-0".into())]),
1333            ]);
1334
1335            let reply: StreamAutoClaimReply = FromRedisValue::from_redis_value(value).unwrap();
1336
1337            assert_eq!(reply.next_stream_id.as_str(), "1713465536578-0");
1338            assert_eq!(reply.claimed.len(), 2);
1339            let ids: Vec<_> = reply.claimed.iter().map(|e| e.id.as_str()).collect();
1340            assert!(ids.contains(&"1713465533411-0"));
1341            assert!(ids.contains(&"1713465536069-0"));
1342            assert_eq!(reply.deleted_ids.len(), 1);
1343            assert!(reply.deleted_ids.contains(&"123456789-0".to_string()))
1344        }
1345
1346        #[test]
1347        fn parses_v6_justid_response() {
1348            let value = Value::Array(vec![
1349                Value::BulkString("1713465536578-0".into()),
1350                Value::Array(vec![
1351                    Value::BulkString("1713465533411-0".into()),
1352                    Value::BulkString("1713465536069-0".into()),
1353                ]),
1354                // V6 and lower lack the deleted_ids array
1355            ]);
1356
1357            let reply: StreamAutoClaimReply = FromRedisValue::from_redis_value(value).unwrap();
1358
1359            assert_eq!(reply.next_stream_id.as_str(), "1713465536578-0");
1360            assert_eq!(reply.claimed.len(), 2);
1361            let ids: Vec<_> = reply.claimed.iter().map(|e| e.id.as_str()).collect();
1362            assert!(ids.contains(&"1713465533411-0"));
1363            assert!(ids.contains(&"1713465536069-0"));
1364            assert_eq!(reply.deleted_ids.len(), 0);
1365        }
1366    }
1367
1368    mod stream_trim_options {
1369        use super::*;
1370
1371        #[test]
1372        fn maxlen_trim() {
1373            let options = StreamTrimOptions::maxlen(StreamTrimmingMode::Approx, 10);
1374
1375            assert_command_eq(options, b"MAXLEN ~ 10");
1376        }
1377
1378        #[test]
1379        fn maxlen_exact_trim() {
1380            let options = StreamTrimOptions::maxlen(StreamTrimmingMode::Exact, 10);
1381
1382            assert_command_eq(options, b"MAXLEN = 10");
1383        }
1384
1385        #[test]
1386        fn maxlen_trim_limit() {
1387            let options = StreamTrimOptions::maxlen(StreamTrimmingMode::Approx, 10).limit(5);
1388
1389            assert_command_eq(options, b"MAXLEN ~ 10 LIMIT 5");
1390        }
1391        #[test]
1392        fn minid_trim_limit() {
1393            let options = StreamTrimOptions::minid(StreamTrimmingMode::Exact, "123456-7").limit(5);
1394
1395            assert_command_eq(options, b"MINID = 123456-7 LIMIT 5");
1396        }
1397    }
1398
1399    mod stream_add_options {
1400        use super::*;
1401
1402        #[test]
1403        fn the_default() {
1404            let options = StreamAddOptions::default();
1405
1406            assert_command_eq(options, b"");
1407        }
1408
1409        #[test]
1410        fn with_maxlen_trim() {
1411            let options = StreamAddOptions::default()
1412                .trim(StreamTrimStrategy::maxlen(StreamTrimmingMode::Exact, 10));
1413
1414            assert_command_eq(options, b"MAXLEN = 10");
1415        }
1416
1417        #[test]
1418        fn with_nomkstream() {
1419            let options = StreamAddOptions::default().nomkstream();
1420
1421            assert_command_eq(options, b"NOMKSTREAM");
1422        }
1423
1424        #[test]
1425        fn with_nomkstream_and_maxlen_trim() {
1426            let options = StreamAddOptions::default()
1427                .nomkstream()
1428                .trim(StreamTrimStrategy::maxlen(StreamTrimmingMode::Exact, 10));
1429
1430            assert_command_eq(options, b"NOMKSTREAM MAXLEN = 10");
1431        }
1432    }
1433}