redis/
streams.rs

1//! Defines types to use with the streams commands.
2
3use crate::{
4    from_redis_value, types::HashMap, ErrorKind, FromRedisValue, RedisError, RedisResult,
5    RedisWrite, ToRedisArgs, Value,
6};
7
8use std::io::Error;
9
10macro_rules! invalid_type_error {
11    ($v:expr, $det:expr) => {{
12        fail!((
13            $crate::ErrorKind::TypeError,
14            "Response was of incompatible type",
15            format!("{:?} (response was {:?})", $det, $v)
16        ));
17    }};
18}
19
20// Stream Maxlen Enum
21
22/// Utility enum for passing `MAXLEN [= or ~] [COUNT]`
23/// arguments into `StreamCommands`.
24/// The enum value represents the count.
25#[derive(PartialEq, Eq, Clone, Debug, Copy)]
26pub enum StreamMaxlen {
27    /// Match an exact count
28    Equals(usize),
29    /// Match an approximate count
30    Approx(usize),
31}
32
33impl ToRedisArgs for StreamMaxlen {
34    fn write_redis_args<W>(&self, out: &mut W)
35    where
36        W: ?Sized + RedisWrite,
37    {
38        let (ch, val) = match *self {
39            StreamMaxlen::Equals(v) => ("=", v),
40            StreamMaxlen::Approx(v) => ("~", v),
41        };
42        out.write_arg(b"MAXLEN");
43        out.write_arg(ch.as_bytes());
44        val.write_redis_args(out);
45    }
46}
47
48/// Utility enum for passing the trim mode`[=|~]`
49/// arguments into `StreamCommands`.
50#[derive(Debug)]
51pub enum StreamTrimmingMode {
52    /// Match an exact count
53    Exact,
54    /// Match an approximate count
55    Approx,
56}
57
58impl ToRedisArgs for StreamTrimmingMode {
59    fn write_redis_args<W>(&self, out: &mut W)
60    where
61        W: ?Sized + RedisWrite,
62    {
63        match self {
64            Self::Exact => out.write_arg(b"="),
65            Self::Approx => out.write_arg(b"~"),
66        };
67    }
68}
69
70/// Utility enum for passing `<MAXLEN|MINID> [=|~] threshold [LIMIT count]`
71/// arguments into `StreamCommands`.
72/// The enum values the trimming mode (=|~), the threshold, and the optional limit
73#[derive(Debug)]
74pub enum StreamTrimStrategy {
75    /// Evicts entries as long as the streams length exceeds threshold.  With an optional limit.
76    MaxLen(StreamTrimmingMode, usize, Option<usize>),
77    /// Evicts entries with IDs lower than threshold, where threshold is a stream ID With an optional limit.
78    MinId(StreamTrimmingMode, String, Option<usize>),
79}
80
81impl StreamTrimStrategy {
82    /// Define a MAXLEN trim strategy with the given maximum number of entries
83    pub fn maxlen(trim: StreamTrimmingMode, max_entries: usize) -> Self {
84        Self::MaxLen(trim, max_entries, None)
85    }
86
87    /// Defines a MINID trim strategy with the given minimum stream ID
88    pub fn minid(trim: StreamTrimmingMode, stream_id: impl Into<String>) -> Self {
89        Self::MinId(trim, stream_id.into(), None)
90    }
91
92    /// Set a limit to the number of records to trim in a single operation
93    pub fn limit(self, limit: usize) -> Self {
94        match self {
95            StreamTrimStrategy::MaxLen(m, t, _) => StreamTrimStrategy::MaxLen(m, t, Some(limit)),
96            StreamTrimStrategy::MinId(m, t, _) => StreamTrimStrategy::MinId(m, t, Some(limit)),
97        }
98    }
99}
100
101impl ToRedisArgs for StreamTrimStrategy {
102    fn write_redis_args<W>(&self, out: &mut W)
103    where
104        W: ?Sized + RedisWrite,
105    {
106        let limit = match self {
107            StreamTrimStrategy::MaxLen(m, t, limit) => {
108                out.write_arg(b"MAXLEN");
109                m.write_redis_args(out);
110                t.write_redis_args(out);
111                limit
112            }
113            StreamTrimStrategy::MinId(m, t, limit) => {
114                out.write_arg(b"MINID");
115                m.write_redis_args(out);
116                t.write_redis_args(out);
117                limit
118            }
119        };
120        if let Some(limit) = limit {
121            out.write_arg(b"LIMIT");
122            limit.write_redis_args(out);
123        }
124    }
125}
126
127/// Builder options for [`xtrim_options`] command
128///
129/// [`xtrim_options`]: ../trait.Commands.html#method.xtrim_options
130///
131#[derive(Debug)]
132pub struct StreamTrimOptions {
133    strategy: StreamTrimStrategy,
134    deletion_policy: Option<StreamDeletionPolicy>,
135}
136
137impl StreamTrimOptions {
138    /// Define a MAXLEN trim strategy with the given maximum number of entries
139    pub fn maxlen(mode: StreamTrimmingMode, max_entries: usize) -> Self {
140        Self {
141            strategy: StreamTrimStrategy::maxlen(mode, max_entries),
142            deletion_policy: None,
143        }
144    }
145
146    /// Defines a MINID trim strategy with the given minimum stream ID
147    pub fn minid(mode: StreamTrimmingMode, stream_id: impl Into<String>) -> Self {
148        Self {
149            strategy: StreamTrimStrategy::minid(mode, stream_id),
150            deletion_policy: None,
151        }
152    }
153
154    /// Set a limit to the number of records to trim in a single operation
155    pub fn limit(mut self, limit: usize) -> Self {
156        self.strategy = self.strategy.limit(limit);
157        self
158    }
159
160    /// Set the deletion policy for the XTRIM operation
161    pub fn set_deletion_policy(mut self, deletion_policy: StreamDeletionPolicy) -> Self {
162        self.deletion_policy = Some(deletion_policy);
163        self
164    }
165}
166
167impl ToRedisArgs for StreamTrimOptions {
168    fn write_redis_args<W>(&self, out: &mut W)
169    where
170        W: ?Sized + RedisWrite,
171    {
172        self.strategy.write_redis_args(out);
173        if let Some(deletion_policy) = self.deletion_policy.as_ref() {
174            deletion_policy.write_redis_args(out);
175        }
176    }
177}
178
179/// Builder options for [`xadd_options`] command
180///
181/// [`xadd_options`]: ../trait.Commands.html#method.xadd_options
182///
183#[derive(Default, Debug)]
184pub struct StreamAddOptions {
185    nomkstream: bool,
186    trim: Option<StreamTrimStrategy>,
187    deletion_policy: Option<StreamDeletionPolicy>,
188}
189
190impl StreamAddOptions {
191    /// Set the NOMKSTREAM flag on which prevents creating a stream for the XADD operation
192    pub fn nomkstream(mut self) -> Self {
193        self.nomkstream = true;
194        self
195    }
196
197    /// Enable trimming when adding using the given trim strategy
198    pub fn trim(mut self, trim: StreamTrimStrategy) -> Self {
199        self.trim = Some(trim);
200        self
201    }
202
203    /// Set the deletion policy for the XADD operation
204    pub fn set_deletion_policy(mut self, deletion_policy: StreamDeletionPolicy) -> Self {
205        self.deletion_policy = Some(deletion_policy);
206        self
207    }
208}
209
210impl ToRedisArgs for StreamAddOptions {
211    fn write_redis_args<W>(&self, out: &mut W)
212    where
213        W: ?Sized + RedisWrite,
214    {
215        if self.nomkstream {
216            out.write_arg(b"NOMKSTREAM");
217        }
218        if let Some(strategy) = self.trim.as_ref() {
219            strategy.write_redis_args(out);
220        }
221        if let Some(deletion_policy) = self.deletion_policy.as_ref() {
222            deletion_policy.write_redis_args(out);
223        }
224    }
225}
226
227/// Builder options for [`xautoclaim_options`] command.
228///
229/// [`xautoclaim_options`]: ../trait.Commands.html#method.xautoclaim_options
230///
231#[derive(Default, Debug)]
232pub struct StreamAutoClaimOptions {
233    count: Option<usize>,
234    justid: bool,
235}
236
237impl StreamAutoClaimOptions {
238    /// Sets the maximum number of elements to claim per stream.
239    pub fn count(mut self, n: usize) -> Self {
240        self.count = Some(n);
241        self
242    }
243
244    /// Set `JUSTID` cmd arg to true. Be advised: the response
245    /// type changes with this option.
246    pub fn with_justid(mut self) -> Self {
247        self.justid = true;
248        self
249    }
250}
251
252impl ToRedisArgs for StreamAutoClaimOptions {
253    fn write_redis_args<W>(&self, out: &mut W)
254    where
255        W: ?Sized + RedisWrite,
256    {
257        if let Some(ref count) = self.count {
258            out.write_arg(b"COUNT");
259            out.write_arg(format!("{count}").as_bytes());
260        }
261        if self.justid {
262            out.write_arg(b"JUSTID");
263        }
264    }
265}
266
267/// Builder options for [`xclaim_options`] command.
268///
269/// [`xclaim_options`]: ../trait.Commands.html#method.xclaim_options
270///
271#[derive(Default, Debug)]
272pub struct StreamClaimOptions {
273    /// Set `IDLE <milliseconds>` cmd arg.
274    idle: Option<usize>,
275    /// Set `TIME <Unix epoch milliseconds>` cmd arg.
276    time: Option<usize>,
277    /// Set `RETRYCOUNT <count>` cmd arg.
278    retry: Option<usize>,
279    /// Set `FORCE` cmd arg.
280    force: bool,
281    /// Set `JUSTID` cmd arg. Be advised: the response
282    /// type changes with this option.
283    justid: bool,
284    /// Set `LASTID <lastid>` cmd arg.
285    lastid: Option<String>,
286}
287
288impl StreamClaimOptions {
289    /// Set `IDLE <milliseconds>` cmd arg.
290    pub fn idle(mut self, ms: usize) -> Self {
291        self.idle = Some(ms);
292        self
293    }
294
295    /// Set `TIME <Unix epoch milliseconds>` cmd arg.
296    pub fn time(mut self, ms_time: usize) -> Self {
297        self.time = Some(ms_time);
298        self
299    }
300
301    /// Set `RETRYCOUNT <count>` cmd arg.
302    pub fn retry(mut self, count: usize) -> Self {
303        self.retry = Some(count);
304        self
305    }
306
307    /// Set `FORCE` cmd arg to true.
308    pub fn with_force(mut self) -> Self {
309        self.force = true;
310        self
311    }
312
313    /// Set `JUSTID` cmd arg to true. Be advised: the response
314    /// type changes with this option.
315    pub fn with_justid(mut self) -> Self {
316        self.justid = true;
317        self
318    }
319
320    /// Set `LASTID <lastid>` cmd arg.
321    pub fn with_lastid(mut self, lastid: impl Into<String>) -> Self {
322        self.lastid = Some(lastid.into());
323        self
324    }
325}
326
327impl ToRedisArgs for StreamClaimOptions {
328    fn write_redis_args<W>(&self, out: &mut W)
329    where
330        W: ?Sized + RedisWrite,
331    {
332        if let Some(ref ms) = self.idle {
333            out.write_arg(b"IDLE");
334            out.write_arg(format!("{ms}").as_bytes());
335        }
336        if let Some(ref ms_time) = self.time {
337            out.write_arg(b"TIME");
338            out.write_arg(format!("{ms_time}").as_bytes());
339        }
340        if let Some(ref count) = self.retry {
341            out.write_arg(b"RETRYCOUNT");
342            out.write_arg(format!("{count}").as_bytes());
343        }
344        if self.force {
345            out.write_arg(b"FORCE");
346        }
347        if self.justid {
348            out.write_arg(b"JUSTID");
349        }
350        if let Some(ref lastid) = self.lastid {
351            out.write_arg(b"LASTID");
352            lastid.write_redis_args(out);
353        }
354    }
355}
356
357/// Argument to `StreamReadOptions`
358/// Represents the Redis `GROUP <groupname> <consumername>` cmd arg.
359/// This option will toggle the cmd from `XREAD` to `XREADGROUP`
360type SRGroup = Option<(Vec<Vec<u8>>, Vec<Vec<u8>>)>;
361/// Builder options for [`xread_options`] command.
362///
363/// [`xread_options`]: ../trait.Commands.html#method.xread_options
364///
365#[derive(Default, Debug)]
366pub struct StreamReadOptions {
367    /// Set the `BLOCK <milliseconds>` cmd arg.
368    block: Option<usize>,
369    /// Set the `COUNT <count>` cmd arg.
370    count: Option<usize>,
371    /// Set the `NOACK` cmd arg.
372    noack: Option<bool>,
373    /// Set the `GROUP <groupname> <consumername>` cmd arg.
374    /// This option will toggle the cmd from XREAD to XREADGROUP.
375    group: SRGroup,
376}
377
378impl StreamReadOptions {
379    /// Indicates whether the command is participating in a group
380    /// and generating ACKs
381    pub fn read_only(&self) -> bool {
382        self.group.is_none()
383    }
384
385    /// Sets the command so that it avoids adding the message
386    /// to the PEL in cases where reliability is not a requirement
387    /// and the occasional message loss is acceptable.
388    pub fn noack(mut self) -> Self {
389        self.noack = Some(true);
390        self
391    }
392
393    /// Sets the block time in milliseconds.
394    pub fn block(mut self, ms: usize) -> Self {
395        self.block = Some(ms);
396        self
397    }
398
399    /// Sets the maximum number of elements to return per stream.
400    pub fn count(mut self, n: usize) -> Self {
401        self.count = Some(n);
402        self
403    }
404
405    /// Sets the name of a consumer group associated to the stream.
406    pub fn group<GN: ToRedisArgs, CN: ToRedisArgs>(
407        mut self,
408        group_name: GN,
409        consumer_name: CN,
410    ) -> Self {
411        self.group = Some((
412            ToRedisArgs::to_redis_args(&group_name),
413            ToRedisArgs::to_redis_args(&consumer_name),
414        ));
415        self
416    }
417}
418
419impl ToRedisArgs for StreamReadOptions {
420    fn write_redis_args<W>(&self, out: &mut W)
421    where
422        W: ?Sized + RedisWrite,
423    {
424        if let Some(ref group) = self.group {
425            out.write_arg(b"GROUP");
426            for i in &group.0 {
427                out.write_arg(i);
428            }
429            for i in &group.1 {
430                out.write_arg(i);
431            }
432        }
433
434        if let Some(ref ms) = self.block {
435            out.write_arg(b"BLOCK");
436            out.write_arg(format!("{ms}").as_bytes());
437        }
438
439        if let Some(ref n) = self.count {
440            out.write_arg(b"COUNT");
441            out.write_arg(format!("{n}").as_bytes());
442        }
443
444        if self.group.is_some() {
445            // noack is only available w/ xreadgroup
446            if self.noack == Some(true) {
447                out.write_arg(b"NOACK");
448            }
449        }
450    }
451}
452
453/// Reply type used with the [`xautoclaim_options`] command.
454///
455/// [`xautoclaim_options`]: ../trait.Commands.html#method.xautoclaim_options
456///
457#[derive(Default, Debug, Clone)]
458pub struct StreamAutoClaimReply {
459    /// The next stream id to use as the start argument for the next xautoclaim
460    pub next_stream_id: String,
461    /// The entries claimed for the consumer. When JUSTID is enabled the map in each entry is blank
462    pub claimed: Vec<StreamId>,
463    /// The list of stream ids that were removed due to no longer being in the stream
464    pub deleted_ids: Vec<String>,
465}
466
467/// Reply type used with [`xread`] or [`xread_options`] commands.
468///
469/// [`xread`]: ../trait.Commands.html#method.xread
470/// [`xread_options`]: ../trait.Commands.html#method.xread_options
471///
472#[derive(Default, Debug, Clone)]
473pub struct StreamReadReply {
474    /// Complex data structure containing a payload for each key in this array
475    pub keys: Vec<StreamKey>,
476}
477
478/// Reply type used with [`xrange`], [`xrange_count`], [`xrange_all`], [`xrevrange`], [`xrevrange_count`], [`xrevrange_all`] commands.
479///
480/// Represents stream entries matching a given range of `id`'s.
481///
482/// [`xrange`]: ../trait.Commands.html#method.xrange
483/// [`xrange_count`]: ../trait.Commands.html#method.xrange_count
484/// [`xrange_all`]: ../trait.Commands.html#method.xrange_all
485/// [`xrevrange`]: ../trait.Commands.html#method.xrevrange
486/// [`xrevrange_count`]: ../trait.Commands.html#method.xrevrange_count
487/// [`xrevrange_all`]: ../trait.Commands.html#method.xrevrange_all
488///
489#[derive(Default, Debug, Clone)]
490pub struct StreamRangeReply {
491    /// Complex data structure containing a payload for each ID in this array
492    pub ids: Vec<StreamId>,
493}
494
495/// Reply type used with [`xclaim`] command.
496///
497/// Represents that ownership of the specified messages was changed.
498///
499/// [`xclaim`]: ../trait.Commands.html#method.xclaim
500///
501#[derive(Default, Debug, Clone)]
502pub struct StreamClaimReply {
503    /// Complex data structure containing a payload for each ID in this array
504    pub ids: Vec<StreamId>,
505}
506
507/// Reply type used with [`xpending`] command.
508///
509/// Data returned here were fetched from the stream without
510/// having been acknowledged.
511///
512/// [`xpending`]: ../trait.Commands.html#method.xpending
513///
514#[derive(Debug, Clone, Default)]
515pub enum StreamPendingReply {
516    /// The stream is empty.
517    #[default]
518    Empty,
519    /// Data with payload exists in the stream.
520    Data(StreamPendingData),
521}
522
523impl StreamPendingReply {
524    /// Returns how many records are in the reply.
525    pub fn count(&self) -> usize {
526        match self {
527            StreamPendingReply::Empty => 0,
528            StreamPendingReply::Data(x) => x.count,
529        }
530    }
531}
532
533/// Inner reply type when an [`xpending`] command has data.
534///
535/// [`xpending`]: ../trait.Commands.html#method.xpending
536#[derive(Default, Debug, Clone)]
537pub struct StreamPendingData {
538    /// Limit on the number of messages to return per call.
539    pub count: usize,
540    /// ID for the first pending record.
541    pub start_id: String,
542    /// ID for the final pending record.
543    pub end_id: String,
544    /// Every consumer in the consumer group with at
545    /// least one pending message,
546    /// and the number of pending messages it has.
547    pub consumers: Vec<StreamInfoConsumer>,
548}
549
550/// Reply type used with [`xpending_count`] and
551/// [`xpending_consumer_count`] commands.
552///
553/// Data returned here have been fetched from the stream without
554/// any acknowledgement.
555///
556/// [`xpending_count`]: ../trait.Commands.html#method.xpending_count
557/// [`xpending_consumer_count`]: ../trait.Commands.html#method.xpending_consumer_count
558///
559#[derive(Default, Debug, Clone)]
560pub struct StreamPendingCountReply {
561    /// An array of structs containing information about
562    /// message IDs yet to be acknowledged by various consumers,
563    /// time since last ack, and total number of acks by that consumer.
564    pub ids: Vec<StreamPendingId>,
565}
566
567/// Reply type used with [`xinfo_stream`] command, containing
568/// general information about the stream stored at the specified key.
569///
570/// The very first and last IDs in the stream are shown,
571/// in order to give some sense about what is the stream content.
572///
573/// [`xinfo_stream`]: ../trait.Commands.html#method.xinfo_stream
574///
575#[derive(Default, Debug, Clone)]
576pub struct StreamInfoStreamReply {
577    /// The last generated ID that may not be the same as the last
578    /// entry ID in case some entry was deleted.
579    pub last_generated_id: String,
580    /// Details about the radix tree representing the stream mostly
581    /// useful for optimization and debugging tasks.
582    pub radix_tree_keys: usize,
583    /// The number of consumer groups associated with the stream.
584    pub groups: usize,
585    /// Number of elements of the stream.
586    pub length: usize,
587    /// The very first entry in the stream.
588    pub first_entry: StreamId,
589    /// The very last entry in the stream.
590    pub last_entry: StreamId,
591}
592
593/// Reply type used with [`xinfo_consumer`] command, an array of every
594/// consumer in a specific consumer group.
595///
596/// [`xinfo_consumer`]: ../trait.Commands.html#method.xinfo_consumer
597///
598#[derive(Default, Debug, Clone)]
599pub struct StreamInfoConsumersReply {
600    /// An array of every consumer in a specific consumer group.
601    pub consumers: Vec<StreamInfoConsumer>,
602}
603
604/// Reply type used with [`xinfo_groups`] command.
605///
606/// This output represents all the consumer groups associated with
607/// the stream.
608///
609/// [`xinfo_groups`]: ../trait.Commands.html#method.xinfo_groups
610///
611#[derive(Default, Debug, Clone)]
612pub struct StreamInfoGroupsReply {
613    /// All the consumer groups associated with the stream.
614    pub groups: Vec<StreamInfoGroup>,
615}
616
617/// A consumer parsed from [`xinfo_consumers`] command.
618///
619/// [`xinfo_consumers`]: ../trait.Commands.html#method.xinfo_consumers
620///
621#[derive(Default, Debug, Clone)]
622pub struct StreamInfoConsumer {
623    /// Name of the consumer group.
624    pub name: String,
625    /// Number of pending messages for this specific consumer.
626    pub pending: usize,
627    /// This consumer's idle time in milliseconds.
628    pub idle: usize,
629}
630
631/// A group parsed from [`xinfo_groups`] command.
632///
633/// [`xinfo_groups`]: ../trait.Commands.html#method.xinfo_groups
634///
635#[derive(Default, Debug, Clone)]
636pub struct StreamInfoGroup {
637    /// The group name.
638    pub name: String,
639    /// Number of consumers known in the group.
640    pub consumers: usize,
641    /// Number of pending messages (delivered but not yet acknowledged) in the group.
642    pub pending: usize,
643    /// Last ID delivered to this group.
644    pub last_delivered_id: String,
645    /// The logical "read counter" of the last entry delivered to group's consumers
646    /// (or `None` if the server does not provide the value).
647    pub entries_read: Option<usize>,
648    /// The number of entries in the stream that are still waiting to be delivered to the
649    /// group's consumers, or a `None` when that number can't be determined.
650    pub lag: Option<usize>,
651}
652
653/// Represents a pending message parsed from [`xpending`] methods.
654///
655/// [`xpending`]: ../trait.Commands.html#method.xpending
656#[derive(Default, Debug, Clone)]
657pub struct StreamPendingId {
658    /// The ID of the message.
659    pub id: String,
660    /// The name of the consumer that fetched the message and has
661    /// still to acknowledge it. We call it the current owner
662    /// of the message.
663    pub consumer: String,
664    /// The number of milliseconds that elapsed since the
665    /// last time this message was delivered to this consumer.
666    pub last_delivered_ms: usize,
667    /// The number of times this message was delivered.
668    pub times_delivered: usize,
669}
670
671/// Represents a stream `key` and its `id`'s parsed from `xread` methods.
672#[derive(Default, Debug, Clone)]
673pub struct StreamKey {
674    /// The stream `key`.
675    pub key: String,
676    /// The parsed stream `id`'s.
677    pub ids: Vec<StreamId>,
678}
679
680/// Represents a stream `id` and its field/values as a `HashMap`
681#[derive(Default, Debug, Clone)]
682pub struct StreamId {
683    /// The stream `id` (entry ID) of this particular message.
684    pub id: String,
685    /// All fields in this message, associated with their respective values.
686    pub map: HashMap<String, Value>,
687}
688
689impl StreamId {
690    /// Converts a `Value::Array` into a `StreamId`.
691    fn from_array_value(v: &Value) -> RedisResult<Self> {
692        let mut stream_id = StreamId::default();
693        if let Value::Array(ref values) = *v {
694            if let Some(v) = values.first() {
695                stream_id.id = from_redis_value(v)?;
696            }
697            if let Some(v) = values.get(1) {
698                stream_id.map = from_redis_value(v)?;
699            }
700        }
701
702        Ok(stream_id)
703    }
704
705    /// Fetches value of a given field and converts it to the specified
706    /// type.
707    pub fn get<T: FromRedisValue>(&self, key: &str) -> Option<T> {
708        match self.map.get(key) {
709            Some(x) => from_redis_value(x).ok(),
710            None => None,
711        }
712    }
713
714    /// Does the message contain a particular field?
715    pub fn contains_key(&self, key: &str) -> bool {
716        self.map.contains_key(key)
717    }
718
719    /// Returns how many field/value pairs exist in this message.
720    pub fn len(&self) -> usize {
721        self.map.len()
722    }
723
724    /// Returns true if there are no field/value pairs in this message.
725    pub fn is_empty(&self) -> bool {
726        self.len() == 0
727    }
728}
729
730type SACRows = Vec<HashMap<String, HashMap<String, Value>>>;
731
732impl FromRedisValue for StreamAutoClaimReply {
733    fn from_redis_value(v: &Value) -> RedisResult<Self> {
734        match *v {
735            Value::Array(ref items) => {
736                if let 2..=3 = items.len() {
737                    let deleted_ids = if let Some(o) = items.get(2) {
738                        from_redis_value(o)?
739                    } else {
740                        Vec::new()
741                    };
742
743                    let claimed: Vec<StreamId> = match &items[1] {
744                        // JUSTID response
745                        Value::Array(x)
746                            if matches!(x.first(), None | Some(Value::BulkString(_))) =>
747                        {
748                            let ids: Vec<String> = from_redis_value(&items[1])?;
749
750                            ids.into_iter()
751                                .map(|id| StreamId {
752                                    id,
753                                    ..Default::default()
754                                })
755                                .collect()
756                        }
757                        // full response
758                        Value::Array(x) if matches!(x.first(), Some(Value::Array(_))) => {
759                            let rows: SACRows = from_redis_value(&items[1])?;
760
761                            rows.into_iter()
762                                .flat_map(|id_row| {
763                                    id_row.into_iter().map(|(id, map)| StreamId { id, map })
764                                })
765                                .collect()
766                        }
767                        _ => invalid_type_error!("Incorrect type", &items[1]),
768                    };
769
770                    Ok(Self {
771                        next_stream_id: from_redis_value(&items[0])?,
772                        claimed,
773                        deleted_ids,
774                    })
775                } else {
776                    invalid_type_error!("Wrong number of entries in array response", v)
777                }
778            }
779            _ => invalid_type_error!("Not a array response", v),
780        }
781    }
782}
783
784type SRRows = Vec<HashMap<String, Vec<HashMap<String, HashMap<String, Value>>>>>;
785impl FromRedisValue for StreamReadReply {
786    fn from_redis_value(v: &Value) -> RedisResult<Self> {
787        let rows: SRRows = from_redis_value(v)?;
788        let keys = rows
789            .into_iter()
790            .flat_map(|row| {
791                row.into_iter().map(|(key, entry)| {
792                    let ids = entry
793                        .into_iter()
794                        .flat_map(|id_row| id_row.into_iter().map(|(id, map)| StreamId { id, map }))
795                        .collect();
796                    StreamKey { key, ids }
797                })
798            })
799            .collect();
800        Ok(StreamReadReply { keys })
801    }
802}
803
804impl FromRedisValue for StreamRangeReply {
805    fn from_redis_value(v: &Value) -> RedisResult<Self> {
806        let rows: Vec<HashMap<String, HashMap<String, Value>>> = from_redis_value(v)?;
807        let ids: Vec<StreamId> = rows
808            .into_iter()
809            .flat_map(|row| row.into_iter().map(|(id, map)| StreamId { id, map }))
810            .collect();
811        Ok(StreamRangeReply { ids })
812    }
813}
814
815impl FromRedisValue for StreamClaimReply {
816    fn from_redis_value(v: &Value) -> RedisResult<Self> {
817        let rows: Vec<HashMap<String, HashMap<String, Value>>> = from_redis_value(v)?;
818        let ids: Vec<StreamId> = rows
819            .into_iter()
820            .flat_map(|row| row.into_iter().map(|(id, map)| StreamId { id, map }))
821            .collect();
822        Ok(StreamClaimReply { ids })
823    }
824}
825
826type SPRInner = (
827    usize,
828    Option<String>,
829    Option<String>,
830    Vec<Option<(String, String)>>,
831);
832impl FromRedisValue for StreamPendingReply {
833    fn from_redis_value(v: &Value) -> RedisResult<Self> {
834        let (count, start, end, consumer_data): SPRInner = from_redis_value(v)?;
835
836        if count == 0 {
837            Ok(StreamPendingReply::Empty)
838        } else {
839            let mut result = StreamPendingData::default();
840
841            let start_id = start
842                .ok_or_else(|| Error::other("IllegalState: Non-zero pending expects start id"))?;
843
844            let end_id =
845                end.ok_or_else(|| Error::other("IllegalState: Non-zero pending expects end id"))?;
846
847            result.count = count;
848            result.start_id = start_id;
849            result.end_id = end_id;
850
851            result.consumers = consumer_data
852                .into_iter()
853                .flatten()
854                .map(|(name, pending)| StreamInfoConsumer {
855                    name,
856                    pending: pending.parse().unwrap_or_default(),
857                    ..Default::default()
858                })
859                .collect();
860
861            Ok(StreamPendingReply::Data(result))
862        }
863    }
864}
865
866impl FromRedisValue for StreamPendingCountReply {
867    fn from_redis_value(v: &Value) -> RedisResult<Self> {
868        let mut reply = StreamPendingCountReply::default();
869        match v {
870            Value::Array(outer_tuple) => {
871                for outer in outer_tuple {
872                    match outer {
873                        Value::Array(inner_tuple) => match &inner_tuple[..] {
874                            [Value::BulkString(id_bytes), Value::BulkString(consumer_bytes), Value::Int(last_delivered_ms_u64), Value::Int(times_delivered_u64)] =>
875                            {
876                                let id = String::from_utf8(id_bytes.to_vec())?;
877                                let consumer = String::from_utf8(consumer_bytes.to_vec())?;
878                                let last_delivered_ms = *last_delivered_ms_u64 as usize;
879                                let times_delivered = *times_delivered_u64 as usize;
880                                reply.ids.push(StreamPendingId {
881                                    id,
882                                    consumer,
883                                    last_delivered_ms,
884                                    times_delivered,
885                                });
886                            }
887                            _ => fail!((
888                                crate::types::ErrorKind::TypeError,
889                                "Cannot parse redis data (3)"
890                            )),
891                        },
892                        _ => fail!((
893                            crate::types::ErrorKind::TypeError,
894                            "Cannot parse redis data (2)"
895                        )),
896                    }
897                }
898            }
899            _ => fail!((
900                crate::types::ErrorKind::TypeError,
901                "Cannot parse redis data (1)"
902            )),
903        };
904        Ok(reply)
905    }
906}
907
908impl FromRedisValue for StreamInfoStreamReply {
909    fn from_redis_value(v: &Value) -> RedisResult<Self> {
910        let map: HashMap<String, Value> = from_redis_value(v)?;
911        let mut reply = StreamInfoStreamReply::default();
912        if let Some(v) = &map.get("last-generated-id") {
913            reply.last_generated_id = from_redis_value(v)?;
914        }
915        if let Some(v) = &map.get("radix-tree-nodes") {
916            reply.radix_tree_keys = from_redis_value(v)?;
917        }
918        if let Some(v) = &map.get("groups") {
919            reply.groups = from_redis_value(v)?;
920        }
921        if let Some(v) = &map.get("length") {
922            reply.length = from_redis_value(v)?;
923        }
924        if let Some(v) = &map.get("first-entry") {
925            reply.first_entry = StreamId::from_array_value(v)?;
926        }
927        if let Some(v) = &map.get("last-entry") {
928            reply.last_entry = StreamId::from_array_value(v)?;
929        }
930        Ok(reply)
931    }
932}
933
934impl FromRedisValue for StreamInfoConsumersReply {
935    fn from_redis_value(v: &Value) -> RedisResult<Self> {
936        let consumers: Vec<HashMap<String, Value>> = from_redis_value(v)?;
937        let mut reply = StreamInfoConsumersReply::default();
938        for map in consumers {
939            let mut c = StreamInfoConsumer::default();
940            if let Some(v) = &map.get("name") {
941                c.name = from_redis_value(v)?;
942            }
943            if let Some(v) = &map.get("pending") {
944                c.pending = from_redis_value(v)?;
945            }
946            if let Some(v) = &map.get("idle") {
947                c.idle = from_redis_value(v)?;
948            }
949            reply.consumers.push(c);
950        }
951
952        Ok(reply)
953    }
954}
955
956impl FromRedisValue for StreamInfoGroupsReply {
957    fn from_redis_value(v: &Value) -> RedisResult<Self> {
958        let groups: Vec<HashMap<String, Value>> = from_redis_value(v)?;
959        let mut reply = StreamInfoGroupsReply::default();
960        for map in groups {
961            let mut g = StreamInfoGroup::default();
962            if let Some(v) = &map.get("name") {
963                g.name = from_redis_value(v)?;
964            }
965            if let Some(v) = &map.get("pending") {
966                g.pending = from_redis_value(v)?;
967            }
968            if let Some(v) = &map.get("consumers") {
969                g.consumers = from_redis_value(v)?;
970            }
971            if let Some(v) = &map.get("last-delivered-id") {
972                g.last_delivered_id = from_redis_value(v)?;
973            }
974            if let Some(v) = &map.get("entries-read") {
975                g.entries_read = if let Value::Nil = v {
976                    None
977                } else {
978                    Some(from_redis_value(v)?)
979                };
980            }
981            if let Some(v) = &map.get("lag") {
982                g.lag = if let Value::Nil = v {
983                    None
984                } else {
985                    Some(from_redis_value(v)?)
986                };
987            }
988            reply.groups.push(g);
989        }
990        Ok(reply)
991    }
992}
993
994/// Deletion policy for stream entries.
995#[derive(Debug, Clone, Default)]
996pub enum StreamDeletionPolicy {
997    /// Preserve existing references to the deleted entries in all consumer groups' PEL.
998    #[default]
999    KeepRef,
1000    /// Delete the entry from the stream and from all the consumer groups' PELs.
1001    DelRef,
1002    /// Delete the entry from the stream and from all the consumer groups' PELs, but only if the entry is acknowledged by all the groups.
1003    Acked,
1004}
1005
1006impl ToRedisArgs for StreamDeletionPolicy {
1007    fn write_redis_args<W>(&self, out: &mut W)
1008    where
1009        W: ?Sized + RedisWrite,
1010    {
1011        match self {
1012            StreamDeletionPolicy::KeepRef => out.write_arg(b"KEEPREF"),
1013            StreamDeletionPolicy::DelRef => out.write_arg(b"DELREF"),
1014            StreamDeletionPolicy::Acked => out.write_arg(b"ACKED"),
1015        }
1016    }
1017}
1018
1019/// Status codes returned by the `XDELEX` command
1020#[cfg(feature = "streams")]
1021#[cfg_attr(docsrs, doc(cfg(feature = "streams")))]
1022#[derive(Debug, PartialEq, Eq)]
1023pub enum XDelExStatusCode {
1024    /// No entry with the given id exists in the stream
1025    IdNotFound = -1,
1026    /// The entry was deleted from the stream
1027    Deleted = 1,
1028    /// The entry was not deleted because it has either not been delivered to any consumer
1029    /// or still has references in the consumer groups' Pending Entries List (PEL)
1030    NotDeletedUnacknowledgedOrStillReferenced = 2,
1031}
1032
1033#[cfg(feature = "streams")]
1034impl FromRedisValue for XDelExStatusCode {
1035    fn from_redis_value(v: &Value) -> RedisResult<Self> {
1036        match v {
1037            Value::Int(code) => match *code {
1038                -1 => Ok(XDelExStatusCode::IdNotFound),
1039                1 => Ok(XDelExStatusCode::Deleted),
1040                2 => Ok(XDelExStatusCode::NotDeletedUnacknowledgedOrStillReferenced),
1041                _ => Err(RedisError::from((
1042                    ErrorKind::TypeError,
1043                    "Invalid XDelExStatusCode status code",
1044                    format!("Unknown status code: {code}"),
1045                ))),
1046            },
1047            _ => Err(RedisError::from((
1048                ErrorKind::TypeError,
1049                "Response type not XDelExStatusCode compatible",
1050            ))),
1051        }
1052    }
1053}
1054
1055/// Status codes returned by the `XACKDEL` command
1056#[cfg(feature = "streams")]
1057#[cfg_attr(docsrs, doc(cfg(feature = "streams")))]
1058#[derive(Debug, PartialEq, Eq)]
1059pub enum XAckDelStatusCode {
1060    /// No entry with the given id exists in the stream
1061    IdNotFound = -1,
1062    /// The entry was acknowledged and deleted from the stream
1063    AcknowledgedAndDeleted = 1,
1064    /// The entry was acknowledged but not deleted because it has references in the consumer groups' Pending Entries List (PEL)
1065    AcknowledgedNotDeletedStillReferenced = 2,
1066}
1067
1068#[cfg(feature = "streams")]
1069impl FromRedisValue for XAckDelStatusCode {
1070    fn from_redis_value(v: &Value) -> RedisResult<Self> {
1071        match v {
1072            Value::Int(code) => match *code {
1073                -1 => Ok(XAckDelStatusCode::IdNotFound),
1074                1 => Ok(XAckDelStatusCode::AcknowledgedAndDeleted),
1075                2 => Ok(XAckDelStatusCode::AcknowledgedNotDeletedStillReferenced),
1076                _ => Err(RedisError::from((
1077                    ErrorKind::TypeError,
1078                    "Invalid XAckDelStatusCode status code",
1079                    format!("Unknown status code: {code}"),
1080                ))),
1081            },
1082            _ => Err(RedisError::from((
1083                ErrorKind::TypeError,
1084                "Response type not XAckDelStatusCode compatible",
1085            ))),
1086        }
1087    }
1088}
1089
1090#[cfg(test)]
1091mod tests {
1092    use super::*;
1093
1094    fn assert_command_eq(object: impl ToRedisArgs, expected: &[u8]) {
1095        let mut out: Vec<Vec<u8>> = Vec::new();
1096
1097        object.write_redis_args(&mut out);
1098
1099        let mut cmd: Vec<u8> = Vec::new();
1100
1101        out.iter_mut().for_each(|item| {
1102            cmd.append(item);
1103            cmd.push(b' ');
1104        });
1105
1106        cmd.pop();
1107
1108        assert_eq!(cmd, expected);
1109    }
1110
1111    mod stream_auto_claim_reply {
1112        use super::*;
1113        use crate::Value;
1114
1115        #[test]
1116        fn short_response() {
1117            let value = Value::Array(vec![Value::BulkString("1713465536578-0".into())]);
1118
1119            let reply: RedisResult<StreamAutoClaimReply> = FromRedisValue::from_redis_value(&value);
1120
1121            assert!(reply.is_err());
1122        }
1123
1124        #[test]
1125        fn parses_none_claimed_response() {
1126            let value = Value::Array(vec![
1127                Value::BulkString("0-0".into()),
1128                Value::Array(vec![]),
1129                Value::Array(vec![]),
1130            ]);
1131
1132            let reply: RedisResult<StreamAutoClaimReply> = FromRedisValue::from_redis_value(&value);
1133
1134            assert!(reply.is_ok());
1135
1136            let reply = reply.unwrap();
1137
1138            assert_eq!(reply.next_stream_id.as_str(), "0-0");
1139            assert_eq!(reply.claimed.len(), 0);
1140            assert_eq!(reply.deleted_ids.len(), 0);
1141        }
1142
1143        #[test]
1144        fn parses_response() {
1145            let value = Value::Array(vec![
1146                Value::BulkString("1713465536578-0".into()),
1147                Value::Array(vec![
1148                    Value::Array(vec![
1149                        Value::BulkString("1713465533411-0".into()),
1150                        // Both RESP2 and RESP3 expose this map as an array of key/values
1151                        Value::Array(vec![
1152                            Value::BulkString("name".into()),
1153                            Value::BulkString("test".into()),
1154                            Value::BulkString("other".into()),
1155                            Value::BulkString("whaterver".into()),
1156                        ]),
1157                    ]),
1158                    Value::Array(vec![
1159                        Value::BulkString("1713465536069-0".into()),
1160                        Value::Array(vec![
1161                            Value::BulkString("name".into()),
1162                            Value::BulkString("another test".into()),
1163                            Value::BulkString("other".into()),
1164                            Value::BulkString("something".into()),
1165                        ]),
1166                    ]),
1167                ]),
1168                Value::Array(vec![Value::BulkString("123456789-0".into())]),
1169            ]);
1170
1171            let reply: RedisResult<StreamAutoClaimReply> = FromRedisValue::from_redis_value(&value);
1172
1173            assert!(reply.is_ok());
1174
1175            let reply = reply.unwrap();
1176
1177            assert_eq!(reply.next_stream_id.as_str(), "1713465536578-0");
1178            assert_eq!(reply.claimed.len(), 2);
1179            assert_eq!(reply.claimed[0].id.as_str(), "1713465533411-0");
1180            assert!(
1181                matches!(reply.claimed[0].map.get("name"), Some(Value::BulkString(v)) if v == "test".as_bytes())
1182            );
1183            assert_eq!(reply.claimed[1].id.as_str(), "1713465536069-0");
1184            assert_eq!(reply.deleted_ids.len(), 1);
1185            assert!(reply.deleted_ids.contains(&"123456789-0".to_string()))
1186        }
1187
1188        #[test]
1189        fn parses_v6_response() {
1190            let value = Value::Array(vec![
1191                Value::BulkString("1713465536578-0".into()),
1192                Value::Array(vec![
1193                    Value::Array(vec![
1194                        Value::BulkString("1713465533411-0".into()),
1195                        Value::Array(vec![
1196                            Value::BulkString("name".into()),
1197                            Value::BulkString("test".into()),
1198                            Value::BulkString("other".into()),
1199                            Value::BulkString("whaterver".into()),
1200                        ]),
1201                    ]),
1202                    Value::Array(vec![
1203                        Value::BulkString("1713465536069-0".into()),
1204                        Value::Array(vec![
1205                            Value::BulkString("name".into()),
1206                            Value::BulkString("another test".into()),
1207                            Value::BulkString("other".into()),
1208                            Value::BulkString("something".into()),
1209                        ]),
1210                    ]),
1211                ]),
1212                // V6 and lower lack the deleted_ids array
1213            ]);
1214
1215            let reply: RedisResult<StreamAutoClaimReply> = FromRedisValue::from_redis_value(&value);
1216
1217            assert!(reply.is_ok());
1218
1219            let reply = reply.unwrap();
1220
1221            assert_eq!(reply.next_stream_id.as_str(), "1713465536578-0");
1222            assert_eq!(reply.claimed.len(), 2);
1223            let ids: Vec<_> = reply.claimed.iter().map(|e| e.id.as_str()).collect();
1224            assert!(ids.contains(&"1713465533411-0"));
1225            assert!(ids.contains(&"1713465536069-0"));
1226            assert_eq!(reply.deleted_ids.len(), 0);
1227        }
1228
1229        #[test]
1230        fn parses_justid_response() {
1231            let value = Value::Array(vec![
1232                Value::BulkString("1713465536578-0".into()),
1233                Value::Array(vec![
1234                    Value::BulkString("1713465533411-0".into()),
1235                    Value::BulkString("1713465536069-0".into()),
1236                ]),
1237                Value::Array(vec![Value::BulkString("123456789-0".into())]),
1238            ]);
1239
1240            let reply: RedisResult<StreamAutoClaimReply> = FromRedisValue::from_redis_value(&value);
1241
1242            assert!(reply.is_ok());
1243
1244            let reply = reply.unwrap();
1245
1246            assert_eq!(reply.next_stream_id.as_str(), "1713465536578-0");
1247            assert_eq!(reply.claimed.len(), 2);
1248            let ids: Vec<_> = reply.claimed.iter().map(|e| e.id.as_str()).collect();
1249            assert!(ids.contains(&"1713465533411-0"));
1250            assert!(ids.contains(&"1713465536069-0"));
1251            assert_eq!(reply.deleted_ids.len(), 1);
1252            assert!(reply.deleted_ids.contains(&"123456789-0".to_string()))
1253        }
1254
1255        #[test]
1256        fn parses_v6_justid_response() {
1257            let value = Value::Array(vec![
1258                Value::BulkString("1713465536578-0".into()),
1259                Value::Array(vec![
1260                    Value::BulkString("1713465533411-0".into()),
1261                    Value::BulkString("1713465536069-0".into()),
1262                ]),
1263                // V6 and lower lack the deleted_ids array
1264            ]);
1265
1266            let reply: RedisResult<StreamAutoClaimReply> = FromRedisValue::from_redis_value(&value);
1267
1268            assert!(reply.is_ok());
1269
1270            let reply = reply.unwrap();
1271
1272            assert_eq!(reply.next_stream_id.as_str(), "1713465536578-0");
1273            assert_eq!(reply.claimed.len(), 2);
1274            let ids: Vec<_> = reply.claimed.iter().map(|e| e.id.as_str()).collect();
1275            assert!(ids.contains(&"1713465533411-0"));
1276            assert!(ids.contains(&"1713465536069-0"));
1277            assert_eq!(reply.deleted_ids.len(), 0);
1278        }
1279    }
1280
1281    mod stream_trim_options {
1282        use super::*;
1283
1284        #[test]
1285        fn maxlen_trim() {
1286            let options = StreamTrimOptions::maxlen(StreamTrimmingMode::Approx, 10);
1287
1288            assert_command_eq(options, b"MAXLEN ~ 10");
1289        }
1290
1291        #[test]
1292        fn maxlen_exact_trim() {
1293            let options = StreamTrimOptions::maxlen(StreamTrimmingMode::Exact, 10);
1294
1295            assert_command_eq(options, b"MAXLEN = 10");
1296        }
1297
1298        #[test]
1299        fn maxlen_trim_limit() {
1300            let options = StreamTrimOptions::maxlen(StreamTrimmingMode::Approx, 10).limit(5);
1301
1302            assert_command_eq(options, b"MAXLEN ~ 10 LIMIT 5");
1303        }
1304        #[test]
1305        fn minid_trim_limit() {
1306            let options = StreamTrimOptions::minid(StreamTrimmingMode::Exact, "123456-7").limit(5);
1307
1308            assert_command_eq(options, b"MINID = 123456-7 LIMIT 5");
1309        }
1310    }
1311
1312    mod stream_add_options {
1313        use super::*;
1314
1315        #[test]
1316        fn the_default() {
1317            let options = StreamAddOptions::default();
1318
1319            assert_command_eq(options, b"");
1320        }
1321
1322        #[test]
1323        fn with_maxlen_trim() {
1324            let options = StreamAddOptions::default()
1325                .trim(StreamTrimStrategy::maxlen(StreamTrimmingMode::Exact, 10));
1326
1327            assert_command_eq(options, b"MAXLEN = 10");
1328        }
1329
1330        #[test]
1331        fn with_nomkstream() {
1332            let options = StreamAddOptions::default().nomkstream();
1333
1334            assert_command_eq(options, b"NOMKSTREAM");
1335        }
1336
1337        #[test]
1338        fn with_nomkstream_and_maxlen_trim() {
1339            let options = StreamAddOptions::default()
1340                .nomkstream()
1341                .trim(StreamTrimStrategy::maxlen(StreamTrimmingMode::Exact, 10));
1342
1343            assert_command_eq(options, b"NOMKSTREAM MAXLEN = 10");
1344        }
1345    }
1346}