1use crate::{
4    from_redis_value, types::HashMap, ErrorKind, FromRedisValue, RedisError, RedisResult,
5    RedisWrite, ToRedisArgs, Value,
6};
7
8use std::io::Error;
9
10macro_rules! invalid_type_error {
11    ($v:expr, $det:expr) => {{
12        fail!((
13            $crate::ErrorKind::TypeError,
14            "Response was of incompatible type",
15            format!("{:?} (response was {:?})", $det, $v)
16        ));
17    }};
18}
19
20#[derive(PartialEq, Eq, Clone, Debug, Copy)]
26pub enum StreamMaxlen {
27    Equals(usize),
29    Approx(usize),
31}
32
33impl ToRedisArgs for StreamMaxlen {
34    fn write_redis_args<W>(&self, out: &mut W)
35    where
36        W: ?Sized + RedisWrite,
37    {
38        let (ch, val) = match *self {
39            StreamMaxlen::Equals(v) => ("=", v),
40            StreamMaxlen::Approx(v) => ("~", v),
41        };
42        out.write_arg(b"MAXLEN");
43        out.write_arg(ch.as_bytes());
44        val.write_redis_args(out);
45    }
46}
47
48#[derive(Debug)]
51pub enum StreamTrimmingMode {
52    Exact,
54    Approx,
56}
57
58impl ToRedisArgs for StreamTrimmingMode {
59    fn write_redis_args<W>(&self, out: &mut W)
60    where
61        W: ?Sized + RedisWrite,
62    {
63        match self {
64            Self::Exact => out.write_arg(b"="),
65            Self::Approx => out.write_arg(b"~"),
66        };
67    }
68}
69
70#[derive(Debug)]
74pub enum StreamTrimStrategy {
75    MaxLen(StreamTrimmingMode, usize, Option<usize>),
77    MinId(StreamTrimmingMode, String, Option<usize>),
79}
80
81impl StreamTrimStrategy {
82    pub fn maxlen(trim: StreamTrimmingMode, max_entries: usize) -> Self {
84        Self::MaxLen(trim, max_entries, None)
85    }
86
87    pub fn minid(trim: StreamTrimmingMode, stream_id: impl Into<String>) -> Self {
89        Self::MinId(trim, stream_id.into(), None)
90    }
91
92    pub fn limit(self, limit: usize) -> Self {
94        match self {
95            StreamTrimStrategy::MaxLen(m, t, _) => StreamTrimStrategy::MaxLen(m, t, Some(limit)),
96            StreamTrimStrategy::MinId(m, t, _) => StreamTrimStrategy::MinId(m, t, Some(limit)),
97        }
98    }
99}
100
101impl ToRedisArgs for StreamTrimStrategy {
102    fn write_redis_args<W>(&self, out: &mut W)
103    where
104        W: ?Sized + RedisWrite,
105    {
106        let limit = match self {
107            StreamTrimStrategy::MaxLen(m, t, limit) => {
108                out.write_arg(b"MAXLEN");
109                m.write_redis_args(out);
110                t.write_redis_args(out);
111                limit
112            }
113            StreamTrimStrategy::MinId(m, t, limit) => {
114                out.write_arg(b"MINID");
115                m.write_redis_args(out);
116                t.write_redis_args(out);
117                limit
118            }
119        };
120        if let Some(limit) = limit {
121            out.write_arg(b"LIMIT");
122            limit.write_redis_args(out);
123        }
124    }
125}
126
127#[derive(Debug)]
132pub struct StreamTrimOptions {
133    strategy: StreamTrimStrategy,
134    deletion_policy: Option<StreamDeletionPolicy>,
135}
136
137impl StreamTrimOptions {
138    pub fn maxlen(mode: StreamTrimmingMode, max_entries: usize) -> Self {
140        Self {
141            strategy: StreamTrimStrategy::maxlen(mode, max_entries),
142            deletion_policy: None,
143        }
144    }
145
146    pub fn minid(mode: StreamTrimmingMode, stream_id: impl Into<String>) -> Self {
148        Self {
149            strategy: StreamTrimStrategy::minid(mode, stream_id),
150            deletion_policy: None,
151        }
152    }
153
154    pub fn limit(mut self, limit: usize) -> Self {
156        self.strategy = self.strategy.limit(limit);
157        self
158    }
159
160    pub fn set_deletion_policy(mut self, deletion_policy: StreamDeletionPolicy) -> Self {
162        self.deletion_policy = Some(deletion_policy);
163        self
164    }
165}
166
167impl ToRedisArgs for StreamTrimOptions {
168    fn write_redis_args<W>(&self, out: &mut W)
169    where
170        W: ?Sized + RedisWrite,
171    {
172        self.strategy.write_redis_args(out);
173        if let Some(deletion_policy) = self.deletion_policy.as_ref() {
174            deletion_policy.write_redis_args(out);
175        }
176    }
177}
178
179#[derive(Default, Debug)]
184pub struct StreamAddOptions {
185    nomkstream: bool,
186    trim: Option<StreamTrimStrategy>,
187    deletion_policy: Option<StreamDeletionPolicy>,
188}
189
190impl StreamAddOptions {
191    pub fn nomkstream(mut self) -> Self {
193        self.nomkstream = true;
194        self
195    }
196
197    pub fn trim(mut self, trim: StreamTrimStrategy) -> Self {
199        self.trim = Some(trim);
200        self
201    }
202
203    pub fn set_deletion_policy(mut self, deletion_policy: StreamDeletionPolicy) -> Self {
205        self.deletion_policy = Some(deletion_policy);
206        self
207    }
208}
209
210impl ToRedisArgs for StreamAddOptions {
211    fn write_redis_args<W>(&self, out: &mut W)
212    where
213        W: ?Sized + RedisWrite,
214    {
215        if self.nomkstream {
216            out.write_arg(b"NOMKSTREAM");
217        }
218        if let Some(strategy) = self.trim.as_ref() {
219            strategy.write_redis_args(out);
220        }
221        if let Some(deletion_policy) = self.deletion_policy.as_ref() {
222            deletion_policy.write_redis_args(out);
223        }
224    }
225}
226
227#[derive(Default, Debug)]
232pub struct StreamAutoClaimOptions {
233    count: Option<usize>,
234    justid: bool,
235}
236
237impl StreamAutoClaimOptions {
238    pub fn count(mut self, n: usize) -> Self {
240        self.count = Some(n);
241        self
242    }
243
244    pub fn with_justid(mut self) -> Self {
247        self.justid = true;
248        self
249    }
250}
251
252impl ToRedisArgs for StreamAutoClaimOptions {
253    fn write_redis_args<W>(&self, out: &mut W)
254    where
255        W: ?Sized + RedisWrite,
256    {
257        if let Some(ref count) = self.count {
258            out.write_arg(b"COUNT");
259            out.write_arg(format!("{count}").as_bytes());
260        }
261        if self.justid {
262            out.write_arg(b"JUSTID");
263        }
264    }
265}
266
267#[derive(Default, Debug)]
272pub struct StreamClaimOptions {
273    idle: Option<usize>,
275    time: Option<usize>,
277    retry: Option<usize>,
279    force: bool,
281    justid: bool,
284    lastid: Option<String>,
286}
287
288impl StreamClaimOptions {
289    pub fn idle(mut self, ms: usize) -> Self {
291        self.idle = Some(ms);
292        self
293    }
294
295    pub fn time(mut self, ms_time: usize) -> Self {
297        self.time = Some(ms_time);
298        self
299    }
300
301    pub fn retry(mut self, count: usize) -> Self {
303        self.retry = Some(count);
304        self
305    }
306
307    pub fn with_force(mut self) -> Self {
309        self.force = true;
310        self
311    }
312
313    pub fn with_justid(mut self) -> Self {
316        self.justid = true;
317        self
318    }
319
320    pub fn with_lastid(mut self, lastid: impl Into<String>) -> Self {
322        self.lastid = Some(lastid.into());
323        self
324    }
325}
326
327impl ToRedisArgs for StreamClaimOptions {
328    fn write_redis_args<W>(&self, out: &mut W)
329    where
330        W: ?Sized + RedisWrite,
331    {
332        if let Some(ref ms) = self.idle {
333            out.write_arg(b"IDLE");
334            out.write_arg(format!("{ms}").as_bytes());
335        }
336        if let Some(ref ms_time) = self.time {
337            out.write_arg(b"TIME");
338            out.write_arg(format!("{ms_time}").as_bytes());
339        }
340        if let Some(ref count) = self.retry {
341            out.write_arg(b"RETRYCOUNT");
342            out.write_arg(format!("{count}").as_bytes());
343        }
344        if self.force {
345            out.write_arg(b"FORCE");
346        }
347        if self.justid {
348            out.write_arg(b"JUSTID");
349        }
350        if let Some(ref lastid) = self.lastid {
351            out.write_arg(b"LASTID");
352            lastid.write_redis_args(out);
353        }
354    }
355}
356
357type SRGroup = Option<(Vec<Vec<u8>>, Vec<Vec<u8>>)>;
361#[derive(Default, Debug)]
366pub struct StreamReadOptions {
367    block: Option<usize>,
369    count: Option<usize>,
371    noack: Option<bool>,
373    group: SRGroup,
376}
377
378impl StreamReadOptions {
379    pub fn read_only(&self) -> bool {
382        self.group.is_none()
383    }
384
385    pub fn noack(mut self) -> Self {
389        self.noack = Some(true);
390        self
391    }
392
393    pub fn block(mut self, ms: usize) -> Self {
395        self.block = Some(ms);
396        self
397    }
398
399    pub fn count(mut self, n: usize) -> Self {
401        self.count = Some(n);
402        self
403    }
404
405    pub fn group<GN: ToRedisArgs, CN: ToRedisArgs>(
407        mut self,
408        group_name: GN,
409        consumer_name: CN,
410    ) -> Self {
411        self.group = Some((
412            ToRedisArgs::to_redis_args(&group_name),
413            ToRedisArgs::to_redis_args(&consumer_name),
414        ));
415        self
416    }
417}
418
419impl ToRedisArgs for StreamReadOptions {
420    fn write_redis_args<W>(&self, out: &mut W)
421    where
422        W: ?Sized + RedisWrite,
423    {
424        if let Some(ref group) = self.group {
425            out.write_arg(b"GROUP");
426            for i in &group.0 {
427                out.write_arg(i);
428            }
429            for i in &group.1 {
430                out.write_arg(i);
431            }
432        }
433
434        if let Some(ref ms) = self.block {
435            out.write_arg(b"BLOCK");
436            out.write_arg(format!("{ms}").as_bytes());
437        }
438
439        if let Some(ref n) = self.count {
440            out.write_arg(b"COUNT");
441            out.write_arg(format!("{n}").as_bytes());
442        }
443
444        if self.group.is_some() {
445            if self.noack == Some(true) {
447                out.write_arg(b"NOACK");
448            }
449        }
450    }
451}
452
453#[derive(Default, Debug, Clone)]
458pub struct StreamAutoClaimReply {
459    pub next_stream_id: String,
461    pub claimed: Vec<StreamId>,
463    pub deleted_ids: Vec<String>,
465}
466
467#[derive(Default, Debug, Clone)]
473pub struct StreamReadReply {
474    pub keys: Vec<StreamKey>,
476}
477
478#[derive(Default, Debug, Clone)]
490pub struct StreamRangeReply {
491    pub ids: Vec<StreamId>,
493}
494
495#[derive(Default, Debug, Clone)]
502pub struct StreamClaimReply {
503    pub ids: Vec<StreamId>,
505}
506
507#[derive(Debug, Clone, Default)]
515pub enum StreamPendingReply {
516    #[default]
518    Empty,
519    Data(StreamPendingData),
521}
522
523impl StreamPendingReply {
524    pub fn count(&self) -> usize {
526        match self {
527            StreamPendingReply::Empty => 0,
528            StreamPendingReply::Data(x) => x.count,
529        }
530    }
531}
532
533#[derive(Default, Debug, Clone)]
537pub struct StreamPendingData {
538    pub count: usize,
540    pub start_id: String,
542    pub end_id: String,
544    pub consumers: Vec<StreamInfoConsumer>,
548}
549
550#[derive(Default, Debug, Clone)]
560pub struct StreamPendingCountReply {
561    pub ids: Vec<StreamPendingId>,
565}
566
567#[derive(Default, Debug, Clone)]
576pub struct StreamInfoStreamReply {
577    pub last_generated_id: String,
580    pub radix_tree_keys: usize,
583    pub groups: usize,
585    pub length: usize,
587    pub first_entry: StreamId,
589    pub last_entry: StreamId,
591}
592
593#[derive(Default, Debug, Clone)]
599pub struct StreamInfoConsumersReply {
600    pub consumers: Vec<StreamInfoConsumer>,
602}
603
604#[derive(Default, Debug, Clone)]
612pub struct StreamInfoGroupsReply {
613    pub groups: Vec<StreamInfoGroup>,
615}
616
617#[derive(Default, Debug, Clone)]
622pub struct StreamInfoConsumer {
623    pub name: String,
625    pub pending: usize,
627    pub idle: usize,
629}
630
631#[derive(Default, Debug, Clone)]
636pub struct StreamInfoGroup {
637    pub name: String,
639    pub consumers: usize,
641    pub pending: usize,
643    pub last_delivered_id: String,
645    pub entries_read: Option<usize>,
648    pub lag: Option<usize>,
651}
652
653#[derive(Default, Debug, Clone)]
657pub struct StreamPendingId {
658    pub id: String,
660    pub consumer: String,
664    pub last_delivered_ms: usize,
667    pub times_delivered: usize,
669}
670
671#[derive(Default, Debug, Clone)]
673pub struct StreamKey {
674    pub key: String,
676    pub ids: Vec<StreamId>,
678}
679
680#[derive(Default, Debug, Clone)]
682pub struct StreamId {
683    pub id: String,
685    pub map: HashMap<String, Value>,
687}
688
689impl StreamId {
690    fn from_array_value(v: &Value) -> RedisResult<Self> {
692        let mut stream_id = StreamId::default();
693        if let Value::Array(ref values) = *v {
694            if let Some(v) = values.first() {
695                stream_id.id = from_redis_value(v)?;
696            }
697            if let Some(v) = values.get(1) {
698                stream_id.map = from_redis_value(v)?;
699            }
700        }
701
702        Ok(stream_id)
703    }
704
705    pub fn get<T: FromRedisValue>(&self, key: &str) -> Option<T> {
708        match self.map.get(key) {
709            Some(x) => from_redis_value(x).ok(),
710            None => None,
711        }
712    }
713
714    pub fn contains_key(&self, key: &str) -> bool {
716        self.map.contains_key(key)
717    }
718
719    pub fn len(&self) -> usize {
721        self.map.len()
722    }
723
724    pub fn is_empty(&self) -> bool {
726        self.len() == 0
727    }
728}
729
730type SACRows = Vec<HashMap<String, HashMap<String, Value>>>;
731
732impl FromRedisValue for StreamAutoClaimReply {
733    fn from_redis_value(v: &Value) -> RedisResult<Self> {
734        match *v {
735            Value::Array(ref items) => {
736                if let 2..=3 = items.len() {
737                    let deleted_ids = if let Some(o) = items.get(2) {
738                        from_redis_value(o)?
739                    } else {
740                        Vec::new()
741                    };
742
743                    let claimed: Vec<StreamId> = match &items[1] {
744                        Value::Array(x)
746                            if matches!(x.first(), None | Some(Value::BulkString(_))) =>
747                        {
748                            let ids: Vec<String> = from_redis_value(&items[1])?;
749
750                            ids.into_iter()
751                                .map(|id| StreamId {
752                                    id,
753                                    ..Default::default()
754                                })
755                                .collect()
756                        }
757                        Value::Array(x) if matches!(x.first(), Some(Value::Array(_))) => {
759                            let rows: SACRows = from_redis_value(&items[1])?;
760
761                            rows.into_iter()
762                                .flat_map(|id_row| {
763                                    id_row.into_iter().map(|(id, map)| StreamId { id, map })
764                                })
765                                .collect()
766                        }
767                        _ => invalid_type_error!("Incorrect type", &items[1]),
768                    };
769
770                    Ok(Self {
771                        next_stream_id: from_redis_value(&items[0])?,
772                        claimed,
773                        deleted_ids,
774                    })
775                } else {
776                    invalid_type_error!("Wrong number of entries in array response", v)
777                }
778            }
779            _ => invalid_type_error!("Not a array response", v),
780        }
781    }
782}
783
784type SRRows = Vec<HashMap<String, Vec<HashMap<String, HashMap<String, Value>>>>>;
785impl FromRedisValue for StreamReadReply {
786    fn from_redis_value(v: &Value) -> RedisResult<Self> {
787        let rows: SRRows = from_redis_value(v)?;
788        let keys = rows
789            .into_iter()
790            .flat_map(|row| {
791                row.into_iter().map(|(key, entry)| {
792                    let ids = entry
793                        .into_iter()
794                        .flat_map(|id_row| id_row.into_iter().map(|(id, map)| StreamId { id, map }))
795                        .collect();
796                    StreamKey { key, ids }
797                })
798            })
799            .collect();
800        Ok(StreamReadReply { keys })
801    }
802}
803
804impl FromRedisValue for StreamRangeReply {
805    fn from_redis_value(v: &Value) -> RedisResult<Self> {
806        let rows: Vec<HashMap<String, HashMap<String, Value>>> = from_redis_value(v)?;
807        let ids: Vec<StreamId> = rows
808            .into_iter()
809            .flat_map(|row| row.into_iter().map(|(id, map)| StreamId { id, map }))
810            .collect();
811        Ok(StreamRangeReply { ids })
812    }
813}
814
815impl FromRedisValue for StreamClaimReply {
816    fn from_redis_value(v: &Value) -> RedisResult<Self> {
817        let rows: Vec<HashMap<String, HashMap<String, Value>>> = from_redis_value(v)?;
818        let ids: Vec<StreamId> = rows
819            .into_iter()
820            .flat_map(|row| row.into_iter().map(|(id, map)| StreamId { id, map }))
821            .collect();
822        Ok(StreamClaimReply { ids })
823    }
824}
825
826type SPRInner = (
827    usize,
828    Option<String>,
829    Option<String>,
830    Vec<Option<(String, String)>>,
831);
832impl FromRedisValue for StreamPendingReply {
833    fn from_redis_value(v: &Value) -> RedisResult<Self> {
834        let (count, start, end, consumer_data): SPRInner = from_redis_value(v)?;
835
836        if count == 0 {
837            Ok(StreamPendingReply::Empty)
838        } else {
839            let mut result = StreamPendingData::default();
840
841            let start_id = start
842                .ok_or_else(|| Error::other("IllegalState: Non-zero pending expects start id"))?;
843
844            let end_id =
845                end.ok_or_else(|| Error::other("IllegalState: Non-zero pending expects end id"))?;
846
847            result.count = count;
848            result.start_id = start_id;
849            result.end_id = end_id;
850
851            result.consumers = consumer_data
852                .into_iter()
853                .flatten()
854                .map(|(name, pending)| StreamInfoConsumer {
855                    name,
856                    pending: pending.parse().unwrap_or_default(),
857                    ..Default::default()
858                })
859                .collect();
860
861            Ok(StreamPendingReply::Data(result))
862        }
863    }
864}
865
866impl FromRedisValue for StreamPendingCountReply {
867    fn from_redis_value(v: &Value) -> RedisResult<Self> {
868        let mut reply = StreamPendingCountReply::default();
869        match v {
870            Value::Array(outer_tuple) => {
871                for outer in outer_tuple {
872                    match outer {
873                        Value::Array(inner_tuple) => match &inner_tuple[..] {
874                            [Value::BulkString(id_bytes), Value::BulkString(consumer_bytes), Value::Int(last_delivered_ms_u64), Value::Int(times_delivered_u64)] =>
875                            {
876                                let id = String::from_utf8(id_bytes.to_vec())?;
877                                let consumer = String::from_utf8(consumer_bytes.to_vec())?;
878                                let last_delivered_ms = *last_delivered_ms_u64 as usize;
879                                let times_delivered = *times_delivered_u64 as usize;
880                                reply.ids.push(StreamPendingId {
881                                    id,
882                                    consumer,
883                                    last_delivered_ms,
884                                    times_delivered,
885                                });
886                            }
887                            _ => fail!((
888                                crate::types::ErrorKind::TypeError,
889                                "Cannot parse redis data (3)"
890                            )),
891                        },
892                        _ => fail!((
893                            crate::types::ErrorKind::TypeError,
894                            "Cannot parse redis data (2)"
895                        )),
896                    }
897                }
898            }
899            _ => fail!((
900                crate::types::ErrorKind::TypeError,
901                "Cannot parse redis data (1)"
902            )),
903        };
904        Ok(reply)
905    }
906}
907
908impl FromRedisValue for StreamInfoStreamReply {
909    fn from_redis_value(v: &Value) -> RedisResult<Self> {
910        let map: HashMap<String, Value> = from_redis_value(v)?;
911        let mut reply = StreamInfoStreamReply::default();
912        if let Some(v) = &map.get("last-generated-id") {
913            reply.last_generated_id = from_redis_value(v)?;
914        }
915        if let Some(v) = &map.get("radix-tree-nodes") {
916            reply.radix_tree_keys = from_redis_value(v)?;
917        }
918        if let Some(v) = &map.get("groups") {
919            reply.groups = from_redis_value(v)?;
920        }
921        if let Some(v) = &map.get("length") {
922            reply.length = from_redis_value(v)?;
923        }
924        if let Some(v) = &map.get("first-entry") {
925            reply.first_entry = StreamId::from_array_value(v)?;
926        }
927        if let Some(v) = &map.get("last-entry") {
928            reply.last_entry = StreamId::from_array_value(v)?;
929        }
930        Ok(reply)
931    }
932}
933
934impl FromRedisValue for StreamInfoConsumersReply {
935    fn from_redis_value(v: &Value) -> RedisResult<Self> {
936        let consumers: Vec<HashMap<String, Value>> = from_redis_value(v)?;
937        let mut reply = StreamInfoConsumersReply::default();
938        for map in consumers {
939            let mut c = StreamInfoConsumer::default();
940            if let Some(v) = &map.get("name") {
941                c.name = from_redis_value(v)?;
942            }
943            if let Some(v) = &map.get("pending") {
944                c.pending = from_redis_value(v)?;
945            }
946            if let Some(v) = &map.get("idle") {
947                c.idle = from_redis_value(v)?;
948            }
949            reply.consumers.push(c);
950        }
951
952        Ok(reply)
953    }
954}
955
956impl FromRedisValue for StreamInfoGroupsReply {
957    fn from_redis_value(v: &Value) -> RedisResult<Self> {
958        let groups: Vec<HashMap<String, Value>> = from_redis_value(v)?;
959        let mut reply = StreamInfoGroupsReply::default();
960        for map in groups {
961            let mut g = StreamInfoGroup::default();
962            if let Some(v) = &map.get("name") {
963                g.name = from_redis_value(v)?;
964            }
965            if let Some(v) = &map.get("pending") {
966                g.pending = from_redis_value(v)?;
967            }
968            if let Some(v) = &map.get("consumers") {
969                g.consumers = from_redis_value(v)?;
970            }
971            if let Some(v) = &map.get("last-delivered-id") {
972                g.last_delivered_id = from_redis_value(v)?;
973            }
974            if let Some(v) = &map.get("entries-read") {
975                g.entries_read = if let Value::Nil = v {
976                    None
977                } else {
978                    Some(from_redis_value(v)?)
979                };
980            }
981            if let Some(v) = &map.get("lag") {
982                g.lag = if let Value::Nil = v {
983                    None
984                } else {
985                    Some(from_redis_value(v)?)
986                };
987            }
988            reply.groups.push(g);
989        }
990        Ok(reply)
991    }
992}
993
994#[derive(Debug, Clone, Default)]
996pub enum StreamDeletionPolicy {
997    #[default]
999    KeepRef,
1000    DelRef,
1002    Acked,
1004}
1005
1006impl ToRedisArgs for StreamDeletionPolicy {
1007    fn write_redis_args<W>(&self, out: &mut W)
1008    where
1009        W: ?Sized + RedisWrite,
1010    {
1011        match self {
1012            StreamDeletionPolicy::KeepRef => out.write_arg(b"KEEPREF"),
1013            StreamDeletionPolicy::DelRef => out.write_arg(b"DELREF"),
1014            StreamDeletionPolicy::Acked => out.write_arg(b"ACKED"),
1015        }
1016    }
1017}
1018
1019#[cfg(feature = "streams")]
1021#[cfg_attr(docsrs, doc(cfg(feature = "streams")))]
1022#[derive(Debug, PartialEq, Eq)]
1023pub enum XDelExStatusCode {
1024    IdNotFound = -1,
1026    Deleted = 1,
1028    NotDeletedUnacknowledgedOrStillReferenced = 2,
1031}
1032
1033#[cfg(feature = "streams")]
1034impl FromRedisValue for XDelExStatusCode {
1035    fn from_redis_value(v: &Value) -> RedisResult<Self> {
1036        match v {
1037            Value::Int(code) => match *code {
1038                -1 => Ok(XDelExStatusCode::IdNotFound),
1039                1 => Ok(XDelExStatusCode::Deleted),
1040                2 => Ok(XDelExStatusCode::NotDeletedUnacknowledgedOrStillReferenced),
1041                _ => Err(RedisError::from((
1042                    ErrorKind::TypeError,
1043                    "Invalid XDelExStatusCode status code",
1044                    format!("Unknown status code: {code}"),
1045                ))),
1046            },
1047            _ => Err(RedisError::from((
1048                ErrorKind::TypeError,
1049                "Response type not XDelExStatusCode compatible",
1050            ))),
1051        }
1052    }
1053}
1054
1055#[cfg(feature = "streams")]
1057#[cfg_attr(docsrs, doc(cfg(feature = "streams")))]
1058#[derive(Debug, PartialEq, Eq)]
1059pub enum XAckDelStatusCode {
1060    IdNotFound = -1,
1062    AcknowledgedAndDeleted = 1,
1064    AcknowledgedNotDeletedStillReferenced = 2,
1066}
1067
1068#[cfg(feature = "streams")]
1069impl FromRedisValue for XAckDelStatusCode {
1070    fn from_redis_value(v: &Value) -> RedisResult<Self> {
1071        match v {
1072            Value::Int(code) => match *code {
1073                -1 => Ok(XAckDelStatusCode::IdNotFound),
1074                1 => Ok(XAckDelStatusCode::AcknowledgedAndDeleted),
1075                2 => Ok(XAckDelStatusCode::AcknowledgedNotDeletedStillReferenced),
1076                _ => Err(RedisError::from((
1077                    ErrorKind::TypeError,
1078                    "Invalid XAckDelStatusCode status code",
1079                    format!("Unknown status code: {code}"),
1080                ))),
1081            },
1082            _ => Err(RedisError::from((
1083                ErrorKind::TypeError,
1084                "Response type not XAckDelStatusCode compatible",
1085            ))),
1086        }
1087    }
1088}
1089
1090#[cfg(test)]
1091mod tests {
1092    use super::*;
1093
1094    fn assert_command_eq(object: impl ToRedisArgs, expected: &[u8]) {
1095        let mut out: Vec<Vec<u8>> = Vec::new();
1096
1097        object.write_redis_args(&mut out);
1098
1099        let mut cmd: Vec<u8> = Vec::new();
1100
1101        out.iter_mut().for_each(|item| {
1102            cmd.append(item);
1103            cmd.push(b' ');
1104        });
1105
1106        cmd.pop();
1107
1108        assert_eq!(cmd, expected);
1109    }
1110
1111    mod stream_auto_claim_reply {
1112        use super::*;
1113        use crate::Value;
1114
1115        #[test]
1116        fn short_response() {
1117            let value = Value::Array(vec![Value::BulkString("1713465536578-0".into())]);
1118
1119            let reply: RedisResult<StreamAutoClaimReply> = FromRedisValue::from_redis_value(&value);
1120
1121            assert!(reply.is_err());
1122        }
1123
1124        #[test]
1125        fn parses_none_claimed_response() {
1126            let value = Value::Array(vec![
1127                Value::BulkString("0-0".into()),
1128                Value::Array(vec![]),
1129                Value::Array(vec![]),
1130            ]);
1131
1132            let reply: RedisResult<StreamAutoClaimReply> = FromRedisValue::from_redis_value(&value);
1133
1134            assert!(reply.is_ok());
1135
1136            let reply = reply.unwrap();
1137
1138            assert_eq!(reply.next_stream_id.as_str(), "0-0");
1139            assert_eq!(reply.claimed.len(), 0);
1140            assert_eq!(reply.deleted_ids.len(), 0);
1141        }
1142
1143        #[test]
1144        fn parses_response() {
1145            let value = Value::Array(vec![
1146                Value::BulkString("1713465536578-0".into()),
1147                Value::Array(vec![
1148                    Value::Array(vec![
1149                        Value::BulkString("1713465533411-0".into()),
1150                        Value::Array(vec![
1152                            Value::BulkString("name".into()),
1153                            Value::BulkString("test".into()),
1154                            Value::BulkString("other".into()),
1155                            Value::BulkString("whaterver".into()),
1156                        ]),
1157                    ]),
1158                    Value::Array(vec![
1159                        Value::BulkString("1713465536069-0".into()),
1160                        Value::Array(vec![
1161                            Value::BulkString("name".into()),
1162                            Value::BulkString("another test".into()),
1163                            Value::BulkString("other".into()),
1164                            Value::BulkString("something".into()),
1165                        ]),
1166                    ]),
1167                ]),
1168                Value::Array(vec![Value::BulkString("123456789-0".into())]),
1169            ]);
1170
1171            let reply: RedisResult<StreamAutoClaimReply> = FromRedisValue::from_redis_value(&value);
1172
1173            assert!(reply.is_ok());
1174
1175            let reply = reply.unwrap();
1176
1177            assert_eq!(reply.next_stream_id.as_str(), "1713465536578-0");
1178            assert_eq!(reply.claimed.len(), 2);
1179            assert_eq!(reply.claimed[0].id.as_str(), "1713465533411-0");
1180            assert!(
1181                matches!(reply.claimed[0].map.get("name"), Some(Value::BulkString(v)) if v == "test".as_bytes())
1182            );
1183            assert_eq!(reply.claimed[1].id.as_str(), "1713465536069-0");
1184            assert_eq!(reply.deleted_ids.len(), 1);
1185            assert!(reply.deleted_ids.contains(&"123456789-0".to_string()))
1186        }
1187
1188        #[test]
1189        fn parses_v6_response() {
1190            let value = Value::Array(vec![
1191                Value::BulkString("1713465536578-0".into()),
1192                Value::Array(vec![
1193                    Value::Array(vec![
1194                        Value::BulkString("1713465533411-0".into()),
1195                        Value::Array(vec![
1196                            Value::BulkString("name".into()),
1197                            Value::BulkString("test".into()),
1198                            Value::BulkString("other".into()),
1199                            Value::BulkString("whaterver".into()),
1200                        ]),
1201                    ]),
1202                    Value::Array(vec![
1203                        Value::BulkString("1713465536069-0".into()),
1204                        Value::Array(vec![
1205                            Value::BulkString("name".into()),
1206                            Value::BulkString("another test".into()),
1207                            Value::BulkString("other".into()),
1208                            Value::BulkString("something".into()),
1209                        ]),
1210                    ]),
1211                ]),
1212                ]);
1214
1215            let reply: RedisResult<StreamAutoClaimReply> = FromRedisValue::from_redis_value(&value);
1216
1217            assert!(reply.is_ok());
1218
1219            let reply = reply.unwrap();
1220
1221            assert_eq!(reply.next_stream_id.as_str(), "1713465536578-0");
1222            assert_eq!(reply.claimed.len(), 2);
1223            let ids: Vec<_> = reply.claimed.iter().map(|e| e.id.as_str()).collect();
1224            assert!(ids.contains(&"1713465533411-0"));
1225            assert!(ids.contains(&"1713465536069-0"));
1226            assert_eq!(reply.deleted_ids.len(), 0);
1227        }
1228
1229        #[test]
1230        fn parses_justid_response() {
1231            let value = Value::Array(vec![
1232                Value::BulkString("1713465536578-0".into()),
1233                Value::Array(vec![
1234                    Value::BulkString("1713465533411-0".into()),
1235                    Value::BulkString("1713465536069-0".into()),
1236                ]),
1237                Value::Array(vec![Value::BulkString("123456789-0".into())]),
1238            ]);
1239
1240            let reply: RedisResult<StreamAutoClaimReply> = FromRedisValue::from_redis_value(&value);
1241
1242            assert!(reply.is_ok());
1243
1244            let reply = reply.unwrap();
1245
1246            assert_eq!(reply.next_stream_id.as_str(), "1713465536578-0");
1247            assert_eq!(reply.claimed.len(), 2);
1248            let ids: Vec<_> = reply.claimed.iter().map(|e| e.id.as_str()).collect();
1249            assert!(ids.contains(&"1713465533411-0"));
1250            assert!(ids.contains(&"1713465536069-0"));
1251            assert_eq!(reply.deleted_ids.len(), 1);
1252            assert!(reply.deleted_ids.contains(&"123456789-0".to_string()))
1253        }
1254
1255        #[test]
1256        fn parses_v6_justid_response() {
1257            let value = Value::Array(vec![
1258                Value::BulkString("1713465536578-0".into()),
1259                Value::Array(vec![
1260                    Value::BulkString("1713465533411-0".into()),
1261                    Value::BulkString("1713465536069-0".into()),
1262                ]),
1263                ]);
1265
1266            let reply: RedisResult<StreamAutoClaimReply> = FromRedisValue::from_redis_value(&value);
1267
1268            assert!(reply.is_ok());
1269
1270            let reply = reply.unwrap();
1271
1272            assert_eq!(reply.next_stream_id.as_str(), "1713465536578-0");
1273            assert_eq!(reply.claimed.len(), 2);
1274            let ids: Vec<_> = reply.claimed.iter().map(|e| e.id.as_str()).collect();
1275            assert!(ids.contains(&"1713465533411-0"));
1276            assert!(ids.contains(&"1713465536069-0"));
1277            assert_eq!(reply.deleted_ids.len(), 0);
1278        }
1279    }
1280
1281    mod stream_trim_options {
1282        use super::*;
1283
1284        #[test]
1285        fn maxlen_trim() {
1286            let options = StreamTrimOptions::maxlen(StreamTrimmingMode::Approx, 10);
1287
1288            assert_command_eq(options, b"MAXLEN ~ 10");
1289        }
1290
1291        #[test]
1292        fn maxlen_exact_trim() {
1293            let options = StreamTrimOptions::maxlen(StreamTrimmingMode::Exact, 10);
1294
1295            assert_command_eq(options, b"MAXLEN = 10");
1296        }
1297
1298        #[test]
1299        fn maxlen_trim_limit() {
1300            let options = StreamTrimOptions::maxlen(StreamTrimmingMode::Approx, 10).limit(5);
1301
1302            assert_command_eq(options, b"MAXLEN ~ 10 LIMIT 5");
1303        }
1304        #[test]
1305        fn minid_trim_limit() {
1306            let options = StreamTrimOptions::minid(StreamTrimmingMode::Exact, "123456-7").limit(5);
1307
1308            assert_command_eq(options, b"MINID = 123456-7 LIMIT 5");
1309        }
1310    }
1311
1312    mod stream_add_options {
1313        use super::*;
1314
1315        #[test]
1316        fn the_default() {
1317            let options = StreamAddOptions::default();
1318
1319            assert_command_eq(options, b"");
1320        }
1321
1322        #[test]
1323        fn with_maxlen_trim() {
1324            let options = StreamAddOptions::default()
1325                .trim(StreamTrimStrategy::maxlen(StreamTrimmingMode::Exact, 10));
1326
1327            assert_command_eq(options, b"MAXLEN = 10");
1328        }
1329
1330        #[test]
1331        fn with_nomkstream() {
1332            let options = StreamAddOptions::default().nomkstream();
1333
1334            assert_command_eq(options, b"NOMKSTREAM");
1335        }
1336
1337        #[test]
1338        fn with_nomkstream_and_maxlen_trim() {
1339            let options = StreamAddOptions::default()
1340                .nomkstream()
1341                .trim(StreamTrimStrategy::maxlen(StreamTrimmingMode::Exact, 10));
1342
1343            assert_command_eq(options, b"NOMKSTREAM MAXLEN = 10");
1344        }
1345    }
1346}