redis/
streams.rs

1//! Defines types to use with the streams commands.
2
3use crate::{
4    from_redis_value, types::HashMap, FromRedisValue, RedisResult, RedisWrite, ToRedisArgs, Value,
5};
6
7use std::io::{Error, ErrorKind};
8
9macro_rules! invalid_type_error {
10    ($v:expr, $det:expr) => {{
11        fail!((
12            $crate::ErrorKind::TypeError,
13            "Response was of incompatible type",
14            format!("{:?} (response was {:?})", $det, $v)
15        ));
16    }};
17}
18
19// Stream Maxlen Enum
20
21/// Utility enum for passing `MAXLEN [= or ~] [COUNT]`
22/// arguments into `StreamCommands`.
23/// The enum value represents the count.
24#[derive(PartialEq, Eq, Clone, Debug, Copy)]
25pub enum StreamMaxlen {
26    /// Match an exact count
27    Equals(usize),
28    /// Match an approximate count
29    Approx(usize),
30}
31
32impl ToRedisArgs for StreamMaxlen {
33    fn write_redis_args<W>(&self, out: &mut W)
34    where
35        W: ?Sized + RedisWrite,
36    {
37        let (ch, val) = match *self {
38            StreamMaxlen::Equals(v) => ("=", v),
39            StreamMaxlen::Approx(v) => ("~", v),
40        };
41        out.write_arg(b"MAXLEN");
42        out.write_arg(ch.as_bytes());
43        val.write_redis_args(out);
44    }
45}
46
47/// Utility enum for passing the trim mode`[=|~]`
48/// arguments into `StreamCommands`.
49#[derive(Debug)]
50pub enum StreamTrimmingMode {
51    /// Match an exact count
52    Exact,
53    /// Match an approximate count
54    Approx,
55}
56
57impl ToRedisArgs for StreamTrimmingMode {
58    fn write_redis_args<W>(&self, out: &mut W)
59    where
60        W: ?Sized + RedisWrite,
61    {
62        match self {
63            Self::Exact => out.write_arg(b"="),
64            Self::Approx => out.write_arg(b"~"),
65        };
66    }
67}
68
69/// Utility enum for passing `<MAXLEN|MINID> [=|~] threshold [LIMIT count]`
70/// arguments into `StreamCommands`.
71/// The enum values the trimming mode (=|~), the threshold, and the optional limit
72#[derive(Debug)]
73pub enum StreamTrimStrategy {
74    /// Evicts entries as long as the streams length exceeds threshold.  With an optional limit.
75    MaxLen(StreamTrimmingMode, usize, Option<usize>),
76    /// Evicts entries with IDs lower than threshold, where threshold is a stream ID With an optional limit.
77    MinId(StreamTrimmingMode, String, Option<usize>),
78}
79
80impl StreamTrimStrategy {
81    /// Define a MAXLEN trim strategy with the given maximum number of entries
82    pub fn maxlen(trim: StreamTrimmingMode, max_entries: usize) -> Self {
83        Self::MaxLen(trim, max_entries, None)
84    }
85
86    /// Defines a MINID trim strategy with the given minimum stream ID
87    pub fn minid(trim: StreamTrimmingMode, stream_id: impl Into<String>) -> Self {
88        Self::MinId(trim, stream_id.into(), None)
89    }
90
91    /// Set a limit to the number of records to trim in a single operation
92    pub fn limit(self, limit: usize) -> Self {
93        match self {
94            StreamTrimStrategy::MaxLen(m, t, _) => StreamTrimStrategy::MaxLen(m, t, Some(limit)),
95            StreamTrimStrategy::MinId(m, t, _) => StreamTrimStrategy::MinId(m, t, Some(limit)),
96        }
97    }
98}
99
100impl ToRedisArgs for StreamTrimStrategy {
101    fn write_redis_args<W>(&self, out: &mut W)
102    where
103        W: ?Sized + RedisWrite,
104    {
105        let limit = match self {
106            StreamTrimStrategy::MaxLen(m, t, limit) => {
107                out.write_arg(b"MAXLEN");
108                m.write_redis_args(out);
109                t.write_redis_args(out);
110                limit
111            }
112            StreamTrimStrategy::MinId(m, t, limit) => {
113                out.write_arg(b"MINID");
114                m.write_redis_args(out);
115                t.write_redis_args(out);
116                limit
117            }
118        };
119        if let Some(limit) = limit {
120            out.write_arg(b"LIMIT");
121            limit.write_redis_args(out);
122        }
123    }
124}
125
126/// Builder options for [`xtrim_options`] command
127///
128/// [`xtrim_options`]: ../trait.Commands.html#method.xtrim_options
129///
130#[derive(Debug)]
131pub struct StreamTrimOptions {
132    strategy: StreamTrimStrategy,
133}
134
135impl StreamTrimOptions {
136    /// Define a MAXLEN trim strategy with the given maximum number of entries
137    pub fn maxlen(mode: StreamTrimmingMode, max_entries: usize) -> Self {
138        Self {
139            strategy: StreamTrimStrategy::maxlen(mode, max_entries),
140        }
141    }
142
143    /// Defines a MINID trim strategy with the given minimum stream ID
144    pub fn minid(mode: StreamTrimmingMode, stream_id: impl Into<String>) -> Self {
145        Self {
146            strategy: StreamTrimStrategy::minid(mode, stream_id),
147        }
148    }
149
150    /// Set a limit to the number of records to trim in a single operation
151    pub fn limit(mut self, limit: usize) -> Self {
152        self.strategy = self.strategy.limit(limit);
153        self
154    }
155}
156
157impl ToRedisArgs for StreamTrimOptions {
158    fn write_redis_args<W>(&self, out: &mut W)
159    where
160        W: ?Sized + RedisWrite,
161    {
162        self.strategy.write_redis_args(out);
163    }
164}
165
166/// Builder options for [`xadd_options`] command
167///
168/// [`xadd_options`]: ../trait.Commands.html#method.xadd_options
169///
170#[derive(Default, Debug)]
171pub struct StreamAddOptions {
172    nomkstream: bool,
173    trim: Option<StreamTrimStrategy>,
174}
175
176impl StreamAddOptions {
177    /// Set the NOMKSTREAM flag on which prevents creating a stream for the XADD operation
178    pub fn nomkstream(mut self) -> Self {
179        self.nomkstream = true;
180        self
181    }
182
183    /// Enable trimming when adding using the given trim strategy
184    pub fn trim(mut self, trim: StreamTrimStrategy) -> Self {
185        self.trim = Some(trim);
186        self
187    }
188}
189
190impl ToRedisArgs for StreamAddOptions {
191    fn write_redis_args<W>(&self, out: &mut W)
192    where
193        W: ?Sized + RedisWrite,
194    {
195        if self.nomkstream {
196            out.write_arg(b"NOMKSTREAM");
197        }
198        if let Some(strategy) = self.trim.as_ref() {
199            strategy.write_redis_args(out);
200        }
201    }
202}
203
204/// Builder options for [`xautoclaim_options`] command.
205///
206/// [`xautoclaim_options`]: ../trait.Commands.html#method.xautoclaim_options
207///
208#[derive(Default, Debug)]
209pub struct StreamAutoClaimOptions {
210    count: Option<usize>,
211    justid: bool,
212}
213
214impl StreamAutoClaimOptions {
215    /// Sets the maximum number of elements to claim per stream.
216    pub fn count(mut self, n: usize) -> Self {
217        self.count = Some(n);
218        self
219    }
220
221    /// Set `JUSTID` cmd arg to true. Be advised: the response
222    /// type changes with this option.
223    pub fn with_justid(mut self) -> Self {
224        self.justid = true;
225        self
226    }
227}
228
229impl ToRedisArgs for StreamAutoClaimOptions {
230    fn write_redis_args<W>(&self, out: &mut W)
231    where
232        W: ?Sized + RedisWrite,
233    {
234        if let Some(ref count) = self.count {
235            out.write_arg(b"COUNT");
236            out.write_arg(format!("{count}").as_bytes());
237        }
238        if self.justid {
239            out.write_arg(b"JUSTID");
240        }
241    }
242}
243
244/// Builder options for [`xclaim_options`] command.
245///
246/// [`xclaim_options`]: ../trait.Commands.html#method.xclaim_options
247///
248#[derive(Default, Debug)]
249pub struct StreamClaimOptions {
250    /// Set `IDLE <milliseconds>` cmd arg.
251    idle: Option<usize>,
252    /// Set `TIME <Unix epoch milliseconds>` cmd arg.
253    time: Option<usize>,
254    /// Set `RETRYCOUNT <count>` cmd arg.
255    retry: Option<usize>,
256    /// Set `FORCE` cmd arg.
257    force: bool,
258    /// Set `JUSTID` cmd arg. Be advised: the response
259    /// type changes with this option.
260    justid: bool,
261    /// Set `LASTID <lastid>` cmd arg.
262    lastid: Option<String>,
263}
264
265impl StreamClaimOptions {
266    /// Set `IDLE <milliseconds>` cmd arg.
267    pub fn idle(mut self, ms: usize) -> Self {
268        self.idle = Some(ms);
269        self
270    }
271
272    /// Set `TIME <Unix epoch milliseconds>` cmd arg.
273    pub fn time(mut self, ms_time: usize) -> Self {
274        self.time = Some(ms_time);
275        self
276    }
277
278    /// Set `RETRYCOUNT <count>` cmd arg.
279    pub fn retry(mut self, count: usize) -> Self {
280        self.retry = Some(count);
281        self
282    }
283
284    /// Set `FORCE` cmd arg to true.
285    pub fn with_force(mut self) -> Self {
286        self.force = true;
287        self
288    }
289
290    /// Set `JUSTID` cmd arg to true. Be advised: the response
291    /// type changes with this option.
292    pub fn with_justid(mut self) -> Self {
293        self.justid = true;
294        self
295    }
296
297    /// Set `LASTID <lastid>` cmd arg.
298    pub fn with_lastid(mut self, lastid: impl Into<String>) -> Self {
299        self.lastid = Some(lastid.into());
300        self
301    }
302}
303
304impl ToRedisArgs for StreamClaimOptions {
305    fn write_redis_args<W>(&self, out: &mut W)
306    where
307        W: ?Sized + RedisWrite,
308    {
309        if let Some(ref ms) = self.idle {
310            out.write_arg(b"IDLE");
311            out.write_arg(format!("{ms}").as_bytes());
312        }
313        if let Some(ref ms_time) = self.time {
314            out.write_arg(b"TIME");
315            out.write_arg(format!("{ms_time}").as_bytes());
316        }
317        if let Some(ref count) = self.retry {
318            out.write_arg(b"RETRYCOUNT");
319            out.write_arg(format!("{count}").as_bytes());
320        }
321        if self.force {
322            out.write_arg(b"FORCE");
323        }
324        if self.justid {
325            out.write_arg(b"JUSTID");
326        }
327        if let Some(ref lastid) = self.lastid {
328            out.write_arg(b"LASTID");
329            lastid.write_redis_args(out);
330        }
331    }
332}
333
334/// Argument to `StreamReadOptions`
335/// Represents the Redis `GROUP <groupname> <consumername>` cmd arg.
336/// This option will toggle the cmd from `XREAD` to `XREADGROUP`
337type SRGroup = Option<(Vec<Vec<u8>>, Vec<Vec<u8>>)>;
338/// Builder options for [`xread_options`] command.
339///
340/// [`xread_options`]: ../trait.Commands.html#method.xread_options
341///
342#[derive(Default, Debug)]
343pub struct StreamReadOptions {
344    /// Set the `BLOCK <milliseconds>` cmd arg.
345    block: Option<usize>,
346    /// Set the `COUNT <count>` cmd arg.
347    count: Option<usize>,
348    /// Set the `NOACK` cmd arg.
349    noack: Option<bool>,
350    /// Set the `GROUP <groupname> <consumername>` cmd arg.
351    /// This option will toggle the cmd from XREAD to XREADGROUP.
352    group: SRGroup,
353}
354
355impl StreamReadOptions {
356    /// Indicates whether the command is participating in a group
357    /// and generating ACKs
358    pub fn read_only(&self) -> bool {
359        self.group.is_none()
360    }
361
362    /// Sets the command so that it avoids adding the message
363    /// to the PEL in cases where reliability is not a requirement
364    /// and the occasional message loss is acceptable.
365    pub fn noack(mut self) -> Self {
366        self.noack = Some(true);
367        self
368    }
369
370    /// Sets the block time in milliseconds.
371    pub fn block(mut self, ms: usize) -> Self {
372        self.block = Some(ms);
373        self
374    }
375
376    /// Sets the maximum number of elements to return per stream.
377    pub fn count(mut self, n: usize) -> Self {
378        self.count = Some(n);
379        self
380    }
381
382    /// Sets the name of a consumer group associated to the stream.
383    pub fn group<GN: ToRedisArgs, CN: ToRedisArgs>(
384        mut self,
385        group_name: GN,
386        consumer_name: CN,
387    ) -> Self {
388        self.group = Some((
389            ToRedisArgs::to_redis_args(&group_name),
390            ToRedisArgs::to_redis_args(&consumer_name),
391        ));
392        self
393    }
394}
395
396impl ToRedisArgs for StreamReadOptions {
397    fn write_redis_args<W>(&self, out: &mut W)
398    where
399        W: ?Sized + RedisWrite,
400    {
401        if let Some(ref group) = self.group {
402            out.write_arg(b"GROUP");
403            for i in &group.0 {
404                out.write_arg(i);
405            }
406            for i in &group.1 {
407                out.write_arg(i);
408            }
409        }
410
411        if let Some(ref ms) = self.block {
412            out.write_arg(b"BLOCK");
413            out.write_arg(format!("{ms}").as_bytes());
414        }
415
416        if let Some(ref n) = self.count {
417            out.write_arg(b"COUNT");
418            out.write_arg(format!("{n}").as_bytes());
419        }
420
421        if self.group.is_some() {
422            // noack is only available w/ xreadgroup
423            if self.noack == Some(true) {
424                out.write_arg(b"NOACK");
425            }
426        }
427    }
428}
429
430/// Reply type used with the [`xautoclaim_options`] command.
431///
432/// [`xautoclaim_options`]: ../trait.Commands.html#method.xautoclaim_options
433///
434#[derive(Default, Debug, Clone)]
435pub struct StreamAutoClaimReply {
436    /// The next stream id to use as the start argument for the next xautoclaim
437    pub next_stream_id: String,
438    /// The entries claimed for the consumer. When JUSTID is enabled the map in each entry is blank
439    pub claimed: Vec<StreamId>,
440    /// The list of stream ids that were removed due to no longer being in the stream
441    pub deleted_ids: Vec<String>,
442}
443
444/// Reply type used with [`xread`] or [`xread_options`] commands.
445///
446/// [`xread`]: ../trait.Commands.html#method.xread
447/// [`xread_options`]: ../trait.Commands.html#method.xread_options
448///
449#[derive(Default, Debug, Clone)]
450pub struct StreamReadReply {
451    /// Complex data structure containing a payload for each key in this array
452    pub keys: Vec<StreamKey>,
453}
454
455/// Reply type used with [`xrange`], [`xrange_count`], [`xrange_all`], [`xrevrange`], [`xrevrange_count`], [`xrevrange_all`] commands.
456///
457/// Represents stream entries matching a given range of `id`'s.
458///
459/// [`xrange`]: ../trait.Commands.html#method.xrange
460/// [`xrange_count`]: ../trait.Commands.html#method.xrange_count
461/// [`xrange_all`]: ../trait.Commands.html#method.xrange_all
462/// [`xrevrange`]: ../trait.Commands.html#method.xrevrange
463/// [`xrevrange_count`]: ../trait.Commands.html#method.xrevrange_count
464/// [`xrevrange_all`]: ../trait.Commands.html#method.xrevrange_all
465///
466#[derive(Default, Debug, Clone)]
467pub struct StreamRangeReply {
468    /// Complex data structure containing a payload for each ID in this array
469    pub ids: Vec<StreamId>,
470}
471
472/// Reply type used with [`xclaim`] command.
473///
474/// Represents that ownership of the specified messages was changed.
475///
476/// [`xclaim`]: ../trait.Commands.html#method.xclaim
477///
478#[derive(Default, Debug, Clone)]
479pub struct StreamClaimReply {
480    /// Complex data structure containing a payload for each ID in this array
481    pub ids: Vec<StreamId>,
482}
483
484/// Reply type used with [`xpending`] command.
485///
486/// Data returned here were fetched from the stream without
487/// having been acknowledged.
488///
489/// [`xpending`]: ../trait.Commands.html#method.xpending
490///
491#[derive(Debug, Clone, Default)]
492pub enum StreamPendingReply {
493    /// The stream is empty.
494    #[default]
495    Empty,
496    /// Data with payload exists in the stream.
497    Data(StreamPendingData),
498}
499
500impl StreamPendingReply {
501    /// Returns how many records are in the reply.
502    pub fn count(&self) -> usize {
503        match self {
504            StreamPendingReply::Empty => 0,
505            StreamPendingReply::Data(x) => x.count,
506        }
507    }
508}
509
510/// Inner reply type when an [`xpending`] command has data.
511///
512/// [`xpending`]: ../trait.Commands.html#method.xpending
513#[derive(Default, Debug, Clone)]
514pub struct StreamPendingData {
515    /// Limit on the number of messages to return per call.
516    pub count: usize,
517    /// ID for the first pending record.
518    pub start_id: String,
519    /// ID for the final pending record.
520    pub end_id: String,
521    /// Every consumer in the consumer group with at
522    /// least one pending message,
523    /// and the number of pending messages it has.
524    pub consumers: Vec<StreamInfoConsumer>,
525}
526
527/// Reply type used with [`xpending_count`] and
528/// [`xpending_consumer_count`] commands.
529///
530/// Data returned here have been fetched from the stream without
531/// any acknowledgement.
532///
533/// [`xpending_count`]: ../trait.Commands.html#method.xpending_count
534/// [`xpending_consumer_count`]: ../trait.Commands.html#method.xpending_consumer_count
535///
536#[derive(Default, Debug, Clone)]
537pub struct StreamPendingCountReply {
538    /// An array of structs containing information about
539    /// message IDs yet to be acknowledged by various consumers,
540    /// time since last ack, and total number of acks by that consumer.
541    pub ids: Vec<StreamPendingId>,
542}
543
544/// Reply type used with [`xinfo_stream`] command, containing
545/// general information about the stream stored at the specified key.
546///
547/// The very first and last IDs in the stream are shown,
548/// in order to give some sense about what is the stream content.
549///
550/// [`xinfo_stream`]: ../trait.Commands.html#method.xinfo_stream
551///
552#[derive(Default, Debug, Clone)]
553pub struct StreamInfoStreamReply {
554    /// The last generated ID that may not be the same as the last
555    /// entry ID in case some entry was deleted.
556    pub last_generated_id: String,
557    /// Details about the radix tree representing the stream mostly
558    /// useful for optimization and debugging tasks.
559    pub radix_tree_keys: usize,
560    /// The number of consumer groups associated with the stream.
561    pub groups: usize,
562    /// Number of elements of the stream.
563    pub length: usize,
564    /// The very first entry in the stream.
565    pub first_entry: StreamId,
566    /// The very last entry in the stream.
567    pub last_entry: StreamId,
568}
569
570/// Reply type used with [`xinfo_consumer`] command, an array of every
571/// consumer in a specific consumer group.
572///
573/// [`xinfo_consumer`]: ../trait.Commands.html#method.xinfo_consumer
574///
575#[derive(Default, Debug, Clone)]
576pub struct StreamInfoConsumersReply {
577    /// An array of every consumer in a specific consumer group.
578    pub consumers: Vec<StreamInfoConsumer>,
579}
580
581/// Reply type used with [`xinfo_groups`] command.
582///
583/// This output represents all the consumer groups associated with
584/// the stream.
585///
586/// [`xinfo_groups`]: ../trait.Commands.html#method.xinfo_groups
587///
588#[derive(Default, Debug, Clone)]
589pub struct StreamInfoGroupsReply {
590    /// All the consumer groups associated with the stream.
591    pub groups: Vec<StreamInfoGroup>,
592}
593
594/// A consumer parsed from [`xinfo_consumers`] command.
595///
596/// [`xinfo_consumers`]: ../trait.Commands.html#method.xinfo_consumers
597///
598#[derive(Default, Debug, Clone)]
599pub struct StreamInfoConsumer {
600    /// Name of the consumer group.
601    pub name: String,
602    /// Number of pending messages for this specific consumer.
603    pub pending: usize,
604    /// This consumer's idle time in milliseconds.
605    pub idle: usize,
606}
607
608/// A group parsed from [`xinfo_groups`] command.
609///
610/// [`xinfo_groups`]: ../trait.Commands.html#method.xinfo_groups
611///
612#[derive(Default, Debug, Clone)]
613pub struct StreamInfoGroup {
614    /// The group name.
615    pub name: String,
616    /// Number of consumers known in the group.
617    pub consumers: usize,
618    /// Number of pending messages (delivered but not yet acknowledged) in the group.
619    pub pending: usize,
620    /// Last ID delivered to this group.
621    pub last_delivered_id: String,
622    /// The logical "read counter" of the last entry delivered to group's consumers
623    /// (or `None` if the server does not provide the value).
624    pub entries_read: Option<usize>,
625    /// The number of entries in the stream that are still waiting to be delivered to the
626    /// group's consumers, or a `None` when that number can't be determined.
627    pub lag: Option<usize>,
628}
629
630/// Represents a pending message parsed from [`xpending`] methods.
631///
632/// [`xpending`]: ../trait.Commands.html#method.xpending
633#[derive(Default, Debug, Clone)]
634pub struct StreamPendingId {
635    /// The ID of the message.
636    pub id: String,
637    /// The name of the consumer that fetched the message and has
638    /// still to acknowledge it. We call it the current owner
639    /// of the message.
640    pub consumer: String,
641    /// The number of milliseconds that elapsed since the
642    /// last time this message was delivered to this consumer.
643    pub last_delivered_ms: usize,
644    /// The number of times this message was delivered.
645    pub times_delivered: usize,
646}
647
648/// Represents a stream `key` and its `id`'s parsed from `xread` methods.
649#[derive(Default, Debug, Clone)]
650pub struct StreamKey {
651    /// The stream `key`.
652    pub key: String,
653    /// The parsed stream `id`'s.
654    pub ids: Vec<StreamId>,
655}
656
657/// Represents a stream `id` and its field/values as a `HashMap`
658#[derive(Default, Debug, Clone)]
659pub struct StreamId {
660    /// The stream `id` (entry ID) of this particular message.
661    pub id: String,
662    /// All fields in this message, associated with their respective values.
663    pub map: HashMap<String, Value>,
664}
665
666impl StreamId {
667    /// Converts a `Value::Array` into a `StreamId`.
668    fn from_array_value(v: &Value) -> RedisResult<Self> {
669        let mut stream_id = StreamId::default();
670        if let Value::Array(ref values) = *v {
671            if let Some(v) = values.first() {
672                stream_id.id = from_redis_value(v)?;
673            }
674            if let Some(v) = values.get(1) {
675                stream_id.map = from_redis_value(v)?;
676            }
677        }
678
679        Ok(stream_id)
680    }
681
682    /// Fetches value of a given field and converts it to the specified
683    /// type.
684    pub fn get<T: FromRedisValue>(&self, key: &str) -> Option<T> {
685        match self.map.get(key) {
686            Some(x) => from_redis_value(x).ok(),
687            None => None,
688        }
689    }
690
691    /// Does the message contain a particular field?
692    pub fn contains_key(&self, key: &str) -> bool {
693        self.map.contains_key(key)
694    }
695
696    /// Returns how many field/value pairs exist in this message.
697    pub fn len(&self) -> usize {
698        self.map.len()
699    }
700
701    /// Returns true if there are no field/value pairs in this message.
702    pub fn is_empty(&self) -> bool {
703        self.len() == 0
704    }
705}
706
707type SACRows = Vec<HashMap<String, HashMap<String, Value>>>;
708
709impl FromRedisValue for StreamAutoClaimReply {
710    fn from_redis_value(v: &Value) -> RedisResult<Self> {
711        match *v {
712            Value::Array(ref items) => {
713                if let 2..=3 = items.len() {
714                    let deleted_ids = if let Some(o) = items.get(2) {
715                        from_redis_value(o)?
716                    } else {
717                        Vec::new()
718                    };
719
720                    let claimed: Vec<StreamId> = match &items[1] {
721                        // JUSTID response
722                        Value::Array(x)
723                            if matches!(x.first(), None | Some(Value::BulkString(_))) =>
724                        {
725                            let ids: Vec<String> = from_redis_value(&items[1])?;
726
727                            ids.into_iter()
728                                .map(|id| StreamId {
729                                    id,
730                                    ..Default::default()
731                                })
732                                .collect()
733                        }
734                        // full response
735                        Value::Array(x) if matches!(x.first(), Some(Value::Array(_))) => {
736                            let rows: SACRows = from_redis_value(&items[1])?;
737
738                            rows.into_iter()
739                                .flat_map(|id_row| {
740                                    id_row.into_iter().map(|(id, map)| StreamId { id, map })
741                                })
742                                .collect()
743                        }
744                        _ => invalid_type_error!("Incorrect type", &items[1]),
745                    };
746
747                    Ok(Self {
748                        next_stream_id: from_redis_value(&items[0])?,
749                        claimed,
750                        deleted_ids,
751                    })
752                } else {
753                    invalid_type_error!("Wrong number of entries in array response", v)
754                }
755            }
756            _ => invalid_type_error!("Not a array response", v),
757        }
758    }
759}
760
761type SRRows = Vec<HashMap<String, Vec<HashMap<String, HashMap<String, Value>>>>>;
762impl FromRedisValue for StreamReadReply {
763    fn from_redis_value(v: &Value) -> RedisResult<Self> {
764        let rows: SRRows = from_redis_value(v)?;
765        let keys = rows
766            .into_iter()
767            .flat_map(|row| {
768                row.into_iter().map(|(key, entry)| {
769                    let ids = entry
770                        .into_iter()
771                        .flat_map(|id_row| id_row.into_iter().map(|(id, map)| StreamId { id, map }))
772                        .collect();
773                    StreamKey { key, ids }
774                })
775            })
776            .collect();
777        Ok(StreamReadReply { keys })
778    }
779}
780
781impl FromRedisValue for StreamRangeReply {
782    fn from_redis_value(v: &Value) -> RedisResult<Self> {
783        let rows: Vec<HashMap<String, HashMap<String, Value>>> = from_redis_value(v)?;
784        let ids: Vec<StreamId> = rows
785            .into_iter()
786            .flat_map(|row| row.into_iter().map(|(id, map)| StreamId { id, map }))
787            .collect();
788        Ok(StreamRangeReply { ids })
789    }
790}
791
792impl FromRedisValue for StreamClaimReply {
793    fn from_redis_value(v: &Value) -> RedisResult<Self> {
794        let rows: Vec<HashMap<String, HashMap<String, Value>>> = from_redis_value(v)?;
795        let ids: Vec<StreamId> = rows
796            .into_iter()
797            .flat_map(|row| row.into_iter().map(|(id, map)| StreamId { id, map }))
798            .collect();
799        Ok(StreamClaimReply { ids })
800    }
801}
802
803type SPRInner = (
804    usize,
805    Option<String>,
806    Option<String>,
807    Vec<Option<(String, String)>>,
808);
809impl FromRedisValue for StreamPendingReply {
810    fn from_redis_value(v: &Value) -> RedisResult<Self> {
811        let (count, start, end, consumer_data): SPRInner = from_redis_value(v)?;
812
813        if count == 0 {
814            Ok(StreamPendingReply::Empty)
815        } else {
816            let mut result = StreamPendingData::default();
817
818            let start_id = start.ok_or_else(|| {
819                Error::new(
820                    ErrorKind::Other,
821                    "IllegalState: Non-zero pending expects start id",
822                )
823            })?;
824
825            let end_id = end.ok_or_else(|| {
826                Error::new(
827                    ErrorKind::Other,
828                    "IllegalState: Non-zero pending expects end id",
829                )
830            })?;
831
832            result.count = count;
833            result.start_id = start_id;
834            result.end_id = end_id;
835
836            result.consumers = consumer_data
837                .into_iter()
838                .flatten()
839                .map(|(name, pending)| StreamInfoConsumer {
840                    name,
841                    pending: pending.parse().unwrap_or_default(),
842                    ..Default::default()
843                })
844                .collect();
845
846            Ok(StreamPendingReply::Data(result))
847        }
848    }
849}
850
851impl FromRedisValue for StreamPendingCountReply {
852    fn from_redis_value(v: &Value) -> RedisResult<Self> {
853        let mut reply = StreamPendingCountReply::default();
854        match v {
855            Value::Array(outer_tuple) => {
856                for outer in outer_tuple {
857                    match outer {
858                        Value::Array(inner_tuple) => match &inner_tuple[..] {
859                            [Value::BulkString(id_bytes), Value::BulkString(consumer_bytes), Value::Int(last_delivered_ms_u64), Value::Int(times_delivered_u64)] =>
860                            {
861                                let id = String::from_utf8(id_bytes.to_vec())?;
862                                let consumer = String::from_utf8(consumer_bytes.to_vec())?;
863                                let last_delivered_ms = *last_delivered_ms_u64 as usize;
864                                let times_delivered = *times_delivered_u64 as usize;
865                                reply.ids.push(StreamPendingId {
866                                    id,
867                                    consumer,
868                                    last_delivered_ms,
869                                    times_delivered,
870                                });
871                            }
872                            _ => fail!((
873                                crate::types::ErrorKind::TypeError,
874                                "Cannot parse redis data (3)"
875                            )),
876                        },
877                        _ => fail!((
878                            crate::types::ErrorKind::TypeError,
879                            "Cannot parse redis data (2)"
880                        )),
881                    }
882                }
883            }
884            _ => fail!((
885                crate::types::ErrorKind::TypeError,
886                "Cannot parse redis data (1)"
887            )),
888        };
889        Ok(reply)
890    }
891}
892
893impl FromRedisValue for StreamInfoStreamReply {
894    fn from_redis_value(v: &Value) -> RedisResult<Self> {
895        let map: HashMap<String, Value> = from_redis_value(v)?;
896        let mut reply = StreamInfoStreamReply::default();
897        if let Some(v) = &map.get("last-generated-id") {
898            reply.last_generated_id = from_redis_value(v)?;
899        }
900        if let Some(v) = &map.get("radix-tree-nodes") {
901            reply.radix_tree_keys = from_redis_value(v)?;
902        }
903        if let Some(v) = &map.get("groups") {
904            reply.groups = from_redis_value(v)?;
905        }
906        if let Some(v) = &map.get("length") {
907            reply.length = from_redis_value(v)?;
908        }
909        if let Some(v) = &map.get("first-entry") {
910            reply.first_entry = StreamId::from_array_value(v)?;
911        }
912        if let Some(v) = &map.get("last-entry") {
913            reply.last_entry = StreamId::from_array_value(v)?;
914        }
915        Ok(reply)
916    }
917}
918
919impl FromRedisValue for StreamInfoConsumersReply {
920    fn from_redis_value(v: &Value) -> RedisResult<Self> {
921        let consumers: Vec<HashMap<String, Value>> = from_redis_value(v)?;
922        let mut reply = StreamInfoConsumersReply::default();
923        for map in consumers {
924            let mut c = StreamInfoConsumer::default();
925            if let Some(v) = &map.get("name") {
926                c.name = from_redis_value(v)?;
927            }
928            if let Some(v) = &map.get("pending") {
929                c.pending = from_redis_value(v)?;
930            }
931            if let Some(v) = &map.get("idle") {
932                c.idle = from_redis_value(v)?;
933            }
934            reply.consumers.push(c);
935        }
936
937        Ok(reply)
938    }
939}
940
941impl FromRedisValue for StreamInfoGroupsReply {
942    fn from_redis_value(v: &Value) -> RedisResult<Self> {
943        let groups: Vec<HashMap<String, Value>> = from_redis_value(v)?;
944        let mut reply = StreamInfoGroupsReply::default();
945        for map in groups {
946            let mut g = StreamInfoGroup::default();
947            if let Some(v) = &map.get("name") {
948                g.name = from_redis_value(v)?;
949            }
950            if let Some(v) = &map.get("pending") {
951                g.pending = from_redis_value(v)?;
952            }
953            if let Some(v) = &map.get("consumers") {
954                g.consumers = from_redis_value(v)?;
955            }
956            if let Some(v) = &map.get("last-delivered-id") {
957                g.last_delivered_id = from_redis_value(v)?;
958            }
959            if let Some(v) = &map.get("entries-read") {
960                g.entries_read = if let Value::Nil = v {
961                    None
962                } else {
963                    Some(from_redis_value(v)?)
964                };
965            }
966            if let Some(v) = &map.get("lag") {
967                g.lag = if let Value::Nil = v {
968                    None
969                } else {
970                    Some(from_redis_value(v)?)
971                };
972            }
973            reply.groups.push(g);
974        }
975        Ok(reply)
976    }
977}
978
979#[cfg(test)]
980mod tests {
981    use super::*;
982
983    fn assert_command_eq(object: impl ToRedisArgs, expected: &[u8]) {
984        let mut out: Vec<Vec<u8>> = Vec::new();
985
986        object.write_redis_args(&mut out);
987
988        let mut cmd: Vec<u8> = Vec::new();
989
990        out.iter_mut().for_each(|item| {
991            cmd.append(item);
992            cmd.push(b' ');
993        });
994
995        cmd.pop();
996
997        assert_eq!(cmd, expected);
998    }
999
1000    mod stream_auto_claim_reply {
1001        use super::*;
1002        use crate::Value;
1003
1004        #[test]
1005        fn short_response() {
1006            let value = Value::Array(vec![Value::BulkString("1713465536578-0".into())]);
1007
1008            let reply: RedisResult<StreamAutoClaimReply> = FromRedisValue::from_redis_value(&value);
1009
1010            assert!(reply.is_err());
1011        }
1012
1013        #[test]
1014        fn parses_none_claimed_response() {
1015            let value = Value::Array(vec![
1016                Value::BulkString("0-0".into()),
1017                Value::Array(vec![]),
1018                Value::Array(vec![]),
1019            ]);
1020
1021            let reply: RedisResult<StreamAutoClaimReply> = FromRedisValue::from_redis_value(&value);
1022
1023            assert!(reply.is_ok());
1024
1025            let reply = reply.unwrap();
1026
1027            assert_eq!(reply.next_stream_id.as_str(), "0-0");
1028            assert_eq!(reply.claimed.len(), 0);
1029            assert_eq!(reply.deleted_ids.len(), 0);
1030        }
1031
1032        #[test]
1033        fn parses_response() {
1034            let value = Value::Array(vec![
1035                Value::BulkString("1713465536578-0".into()),
1036                Value::Array(vec![
1037                    Value::Array(vec![
1038                        Value::BulkString("1713465533411-0".into()),
1039                        // Both RESP2 and RESP3 expose this map as an array of key/values
1040                        Value::Array(vec![
1041                            Value::BulkString("name".into()),
1042                            Value::BulkString("test".into()),
1043                            Value::BulkString("other".into()),
1044                            Value::BulkString("whaterver".into()),
1045                        ]),
1046                    ]),
1047                    Value::Array(vec![
1048                        Value::BulkString("1713465536069-0".into()),
1049                        Value::Array(vec![
1050                            Value::BulkString("name".into()),
1051                            Value::BulkString("another test".into()),
1052                            Value::BulkString("other".into()),
1053                            Value::BulkString("something".into()),
1054                        ]),
1055                    ]),
1056                ]),
1057                Value::Array(vec![Value::BulkString("123456789-0".into())]),
1058            ]);
1059
1060            let reply: RedisResult<StreamAutoClaimReply> = FromRedisValue::from_redis_value(&value);
1061
1062            assert!(reply.is_ok());
1063
1064            let reply = reply.unwrap();
1065
1066            assert_eq!(reply.next_stream_id.as_str(), "1713465536578-0");
1067            assert_eq!(reply.claimed.len(), 2);
1068            assert_eq!(reply.claimed[0].id.as_str(), "1713465533411-0");
1069            assert!(
1070                matches!(reply.claimed[0].map.get("name"), Some(Value::BulkString(v)) if v == "test".as_bytes())
1071            );
1072            assert_eq!(reply.claimed[1].id.as_str(), "1713465536069-0");
1073            assert_eq!(reply.deleted_ids.len(), 1);
1074            assert!(reply.deleted_ids.contains(&"123456789-0".to_string()))
1075        }
1076
1077        #[test]
1078        fn parses_v6_response() {
1079            let value = Value::Array(vec![
1080                Value::BulkString("1713465536578-0".into()),
1081                Value::Array(vec![
1082                    Value::Array(vec![
1083                        Value::BulkString("1713465533411-0".into()),
1084                        Value::Array(vec![
1085                            Value::BulkString("name".into()),
1086                            Value::BulkString("test".into()),
1087                            Value::BulkString("other".into()),
1088                            Value::BulkString("whaterver".into()),
1089                        ]),
1090                    ]),
1091                    Value::Array(vec![
1092                        Value::BulkString("1713465536069-0".into()),
1093                        Value::Array(vec![
1094                            Value::BulkString("name".into()),
1095                            Value::BulkString("another test".into()),
1096                            Value::BulkString("other".into()),
1097                            Value::BulkString("something".into()),
1098                        ]),
1099                    ]),
1100                ]),
1101                // V6 and lower lack the deleted_ids array
1102            ]);
1103
1104            let reply: RedisResult<StreamAutoClaimReply> = FromRedisValue::from_redis_value(&value);
1105
1106            assert!(reply.is_ok());
1107
1108            let reply = reply.unwrap();
1109
1110            assert_eq!(reply.next_stream_id.as_str(), "1713465536578-0");
1111            assert_eq!(reply.claimed.len(), 2);
1112            let ids: Vec<_> = reply.claimed.iter().map(|e| e.id.as_str()).collect();
1113            assert!(ids.contains(&"1713465533411-0"));
1114            assert!(ids.contains(&"1713465536069-0"));
1115            assert_eq!(reply.deleted_ids.len(), 0);
1116        }
1117
1118        #[test]
1119        fn parses_justid_response() {
1120            let value = Value::Array(vec![
1121                Value::BulkString("1713465536578-0".into()),
1122                Value::Array(vec![
1123                    Value::BulkString("1713465533411-0".into()),
1124                    Value::BulkString("1713465536069-0".into()),
1125                ]),
1126                Value::Array(vec![Value::BulkString("123456789-0".into())]),
1127            ]);
1128
1129            let reply: RedisResult<StreamAutoClaimReply> = FromRedisValue::from_redis_value(&value);
1130
1131            assert!(reply.is_ok());
1132
1133            let reply = reply.unwrap();
1134
1135            assert_eq!(reply.next_stream_id.as_str(), "1713465536578-0");
1136            assert_eq!(reply.claimed.len(), 2);
1137            let ids: Vec<_> = reply.claimed.iter().map(|e| e.id.as_str()).collect();
1138            assert!(ids.contains(&"1713465533411-0"));
1139            assert!(ids.contains(&"1713465536069-0"));
1140            assert_eq!(reply.deleted_ids.len(), 1);
1141            assert!(reply.deleted_ids.contains(&"123456789-0".to_string()))
1142        }
1143
1144        #[test]
1145        fn parses_v6_justid_response() {
1146            let value = Value::Array(vec![
1147                Value::BulkString("1713465536578-0".into()),
1148                Value::Array(vec![
1149                    Value::BulkString("1713465533411-0".into()),
1150                    Value::BulkString("1713465536069-0".into()),
1151                ]),
1152                // V6 and lower lack the deleted_ids array
1153            ]);
1154
1155            let reply: RedisResult<StreamAutoClaimReply> = FromRedisValue::from_redis_value(&value);
1156
1157            assert!(reply.is_ok());
1158
1159            let reply = reply.unwrap();
1160
1161            assert_eq!(reply.next_stream_id.as_str(), "1713465536578-0");
1162            assert_eq!(reply.claimed.len(), 2);
1163            let ids: Vec<_> = reply.claimed.iter().map(|e| e.id.as_str()).collect();
1164            assert!(ids.contains(&"1713465533411-0"));
1165            assert!(ids.contains(&"1713465536069-0"));
1166            assert_eq!(reply.deleted_ids.len(), 0);
1167        }
1168    }
1169
1170    mod stream_trim_options {
1171        use super::*;
1172
1173        #[test]
1174        fn maxlen_trim() {
1175            let options = StreamTrimOptions::maxlen(StreamTrimmingMode::Approx, 10);
1176
1177            assert_command_eq(options, b"MAXLEN ~ 10");
1178        }
1179
1180        #[test]
1181        fn maxlen_exact_trim() {
1182            let options = StreamTrimOptions::maxlen(StreamTrimmingMode::Exact, 10);
1183
1184            assert_command_eq(options, b"MAXLEN = 10");
1185        }
1186
1187        #[test]
1188        fn maxlen_trim_limit() {
1189            let options = StreamTrimOptions::maxlen(StreamTrimmingMode::Approx, 10).limit(5);
1190
1191            assert_command_eq(options, b"MAXLEN ~ 10 LIMIT 5");
1192        }
1193        #[test]
1194        fn minid_trim_limit() {
1195            let options = StreamTrimOptions::minid(StreamTrimmingMode::Exact, "123456-7").limit(5);
1196
1197            assert_command_eq(options, b"MINID = 123456-7 LIMIT 5");
1198        }
1199    }
1200
1201    mod stream_add_options {
1202        use super::*;
1203
1204        #[test]
1205        fn the_default() {
1206            let options = StreamAddOptions::default();
1207
1208            assert_command_eq(options, b"");
1209        }
1210
1211        #[test]
1212        fn with_maxlen_trim() {
1213            let options = StreamAddOptions::default()
1214                .trim(StreamTrimStrategy::maxlen(StreamTrimmingMode::Exact, 10));
1215
1216            assert_command_eq(options, b"MAXLEN = 10");
1217        }
1218
1219        #[test]
1220        fn with_nomkstream() {
1221            let options = StreamAddOptions::default().nomkstream();
1222
1223            assert_command_eq(options, b"NOMKSTREAM");
1224        }
1225
1226        #[test]
1227        fn with_nomkstream_and_maxlen_trim() {
1228            let options = StreamAddOptions::default()
1229                .nomkstream()
1230                .trim(StreamTrimStrategy::maxlen(StreamTrimmingMode::Exact, 10));
1231
1232            assert_command_eq(options, b"NOMKSTREAM MAXLEN = 10");
1233        }
1234    }
1235}