1use crate::{
4 from_redis_value, types::HashMap, FromRedisValue, RedisResult, RedisWrite, ToRedisArgs, Value,
5};
6
7use std::io::{Error, ErrorKind};
8
9macro_rules! invalid_type_error {
10 ($v:expr, $det:expr) => {{
11 fail!((
12 $crate::ErrorKind::TypeError,
13 "Response was of incompatible type",
14 format!("{:?} (response was {:?})", $det, $v)
15 ));
16 }};
17}
18
19#[derive(PartialEq, Eq, Clone, Debug, Copy)]
25pub enum StreamMaxlen {
26 Equals(usize),
28 Approx(usize),
30}
31
32impl ToRedisArgs for StreamMaxlen {
33 fn write_redis_args<W>(&self, out: &mut W)
34 where
35 W: ?Sized + RedisWrite,
36 {
37 let (ch, val) = match *self {
38 StreamMaxlen::Equals(v) => ("=", v),
39 StreamMaxlen::Approx(v) => ("~", v),
40 };
41 out.write_arg(b"MAXLEN");
42 out.write_arg(ch.as_bytes());
43 val.write_redis_args(out);
44 }
45}
46
47#[derive(Debug)]
50pub enum StreamTrimmingMode {
51 Exact,
53 Approx,
55}
56
57impl ToRedisArgs for StreamTrimmingMode {
58 fn write_redis_args<W>(&self, out: &mut W)
59 where
60 W: ?Sized + RedisWrite,
61 {
62 match self {
63 Self::Exact => out.write_arg(b"="),
64 Self::Approx => out.write_arg(b"~"),
65 };
66 }
67}
68
69#[derive(Debug)]
73pub enum StreamTrimStrategy {
74 MaxLen(StreamTrimmingMode, usize, Option<usize>),
76 MinId(StreamTrimmingMode, String, Option<usize>),
78}
79
80impl StreamTrimStrategy {
81 pub fn maxlen(trim: StreamTrimmingMode, max_entries: usize) -> Self {
83 Self::MaxLen(trim, max_entries, None)
84 }
85
86 pub fn minid(trim: StreamTrimmingMode, stream_id: impl Into<String>) -> Self {
88 Self::MinId(trim, stream_id.into(), None)
89 }
90
91 pub fn limit(self, limit: usize) -> Self {
93 match self {
94 StreamTrimStrategy::MaxLen(m, t, _) => StreamTrimStrategy::MaxLen(m, t, Some(limit)),
95 StreamTrimStrategy::MinId(m, t, _) => StreamTrimStrategy::MinId(m, t, Some(limit)),
96 }
97 }
98}
99
100impl ToRedisArgs for StreamTrimStrategy {
101 fn write_redis_args<W>(&self, out: &mut W)
102 where
103 W: ?Sized + RedisWrite,
104 {
105 let limit = match self {
106 StreamTrimStrategy::MaxLen(m, t, limit) => {
107 out.write_arg(b"MAXLEN");
108 m.write_redis_args(out);
109 t.write_redis_args(out);
110 limit
111 }
112 StreamTrimStrategy::MinId(m, t, limit) => {
113 out.write_arg(b"MINID");
114 m.write_redis_args(out);
115 t.write_redis_args(out);
116 limit
117 }
118 };
119 if let Some(limit) = limit {
120 out.write_arg(b"LIMIT");
121 limit.write_redis_args(out);
122 }
123 }
124}
125
126#[derive(Debug)]
131pub struct StreamTrimOptions {
132 strategy: StreamTrimStrategy,
133}
134
135impl StreamTrimOptions {
136 pub fn maxlen(mode: StreamTrimmingMode, max_entries: usize) -> Self {
138 Self {
139 strategy: StreamTrimStrategy::maxlen(mode, max_entries),
140 }
141 }
142
143 pub fn minid(mode: StreamTrimmingMode, stream_id: impl Into<String>) -> Self {
145 Self {
146 strategy: StreamTrimStrategy::minid(mode, stream_id),
147 }
148 }
149
150 pub fn limit(mut self, limit: usize) -> Self {
152 self.strategy = self.strategy.limit(limit);
153 self
154 }
155}
156
157impl ToRedisArgs for StreamTrimOptions {
158 fn write_redis_args<W>(&self, out: &mut W)
159 where
160 W: ?Sized + RedisWrite,
161 {
162 self.strategy.write_redis_args(out);
163 }
164}
165
166#[derive(Default, Debug)]
171pub struct StreamAddOptions {
172 nomkstream: bool,
173 trim: Option<StreamTrimStrategy>,
174}
175
176impl StreamAddOptions {
177 pub fn nomkstream(mut self) -> Self {
179 self.nomkstream = true;
180 self
181 }
182
183 pub fn trim(mut self, trim: StreamTrimStrategy) -> Self {
185 self.trim = Some(trim);
186 self
187 }
188}
189
190impl ToRedisArgs for StreamAddOptions {
191 fn write_redis_args<W>(&self, out: &mut W)
192 where
193 W: ?Sized + RedisWrite,
194 {
195 if self.nomkstream {
196 out.write_arg(b"NOMKSTREAM");
197 }
198 if let Some(strategy) = self.trim.as_ref() {
199 strategy.write_redis_args(out);
200 }
201 }
202}
203
204#[derive(Default, Debug)]
209pub struct StreamAutoClaimOptions {
210 count: Option<usize>,
211 justid: bool,
212}
213
214impl StreamAutoClaimOptions {
215 pub fn count(mut self, n: usize) -> Self {
217 self.count = Some(n);
218 self
219 }
220
221 pub fn with_justid(mut self) -> Self {
224 self.justid = true;
225 self
226 }
227}
228
229impl ToRedisArgs for StreamAutoClaimOptions {
230 fn write_redis_args<W>(&self, out: &mut W)
231 where
232 W: ?Sized + RedisWrite,
233 {
234 if let Some(ref count) = self.count {
235 out.write_arg(b"COUNT");
236 out.write_arg(format!("{count}").as_bytes());
237 }
238 if self.justid {
239 out.write_arg(b"JUSTID");
240 }
241 }
242}
243
244#[derive(Default, Debug)]
249pub struct StreamClaimOptions {
250 idle: Option<usize>,
252 time: Option<usize>,
254 retry: Option<usize>,
256 force: bool,
258 justid: bool,
261 lastid: Option<String>,
263}
264
265impl StreamClaimOptions {
266 pub fn idle(mut self, ms: usize) -> Self {
268 self.idle = Some(ms);
269 self
270 }
271
272 pub fn time(mut self, ms_time: usize) -> Self {
274 self.time = Some(ms_time);
275 self
276 }
277
278 pub fn retry(mut self, count: usize) -> Self {
280 self.retry = Some(count);
281 self
282 }
283
284 pub fn with_force(mut self) -> Self {
286 self.force = true;
287 self
288 }
289
290 pub fn with_justid(mut self) -> Self {
293 self.justid = true;
294 self
295 }
296
297 pub fn with_lastid(mut self, lastid: impl Into<String>) -> Self {
299 self.lastid = Some(lastid.into());
300 self
301 }
302}
303
304impl ToRedisArgs for StreamClaimOptions {
305 fn write_redis_args<W>(&self, out: &mut W)
306 where
307 W: ?Sized + RedisWrite,
308 {
309 if let Some(ref ms) = self.idle {
310 out.write_arg(b"IDLE");
311 out.write_arg(format!("{ms}").as_bytes());
312 }
313 if let Some(ref ms_time) = self.time {
314 out.write_arg(b"TIME");
315 out.write_arg(format!("{ms_time}").as_bytes());
316 }
317 if let Some(ref count) = self.retry {
318 out.write_arg(b"RETRYCOUNT");
319 out.write_arg(format!("{count}").as_bytes());
320 }
321 if self.force {
322 out.write_arg(b"FORCE");
323 }
324 if self.justid {
325 out.write_arg(b"JUSTID");
326 }
327 if let Some(ref lastid) = self.lastid {
328 out.write_arg(b"LASTID");
329 lastid.write_redis_args(out);
330 }
331 }
332}
333
334type SRGroup = Option<(Vec<Vec<u8>>, Vec<Vec<u8>>)>;
338#[derive(Default, Debug)]
343pub struct StreamReadOptions {
344 block: Option<usize>,
346 count: Option<usize>,
348 noack: Option<bool>,
350 group: SRGroup,
353}
354
355impl StreamReadOptions {
356 pub fn read_only(&self) -> bool {
359 self.group.is_none()
360 }
361
362 pub fn noack(mut self) -> Self {
366 self.noack = Some(true);
367 self
368 }
369
370 pub fn block(mut self, ms: usize) -> Self {
372 self.block = Some(ms);
373 self
374 }
375
376 pub fn count(mut self, n: usize) -> Self {
378 self.count = Some(n);
379 self
380 }
381
382 pub fn group<GN: ToRedisArgs, CN: ToRedisArgs>(
384 mut self,
385 group_name: GN,
386 consumer_name: CN,
387 ) -> Self {
388 self.group = Some((
389 ToRedisArgs::to_redis_args(&group_name),
390 ToRedisArgs::to_redis_args(&consumer_name),
391 ));
392 self
393 }
394}
395
396impl ToRedisArgs for StreamReadOptions {
397 fn write_redis_args<W>(&self, out: &mut W)
398 where
399 W: ?Sized + RedisWrite,
400 {
401 if let Some(ref group) = self.group {
402 out.write_arg(b"GROUP");
403 for i in &group.0 {
404 out.write_arg(i);
405 }
406 for i in &group.1 {
407 out.write_arg(i);
408 }
409 }
410
411 if let Some(ref ms) = self.block {
412 out.write_arg(b"BLOCK");
413 out.write_arg(format!("{ms}").as_bytes());
414 }
415
416 if let Some(ref n) = self.count {
417 out.write_arg(b"COUNT");
418 out.write_arg(format!("{n}").as_bytes());
419 }
420
421 if self.group.is_some() {
422 if self.noack == Some(true) {
424 out.write_arg(b"NOACK");
425 }
426 }
427 }
428}
429
430#[derive(Default, Debug, Clone)]
435pub struct StreamAutoClaimReply {
436 pub next_stream_id: String,
438 pub claimed: Vec<StreamId>,
440 pub deleted_ids: Vec<String>,
442}
443
444#[derive(Default, Debug, Clone)]
450pub struct StreamReadReply {
451 pub keys: Vec<StreamKey>,
453}
454
455#[derive(Default, Debug, Clone)]
467pub struct StreamRangeReply {
468 pub ids: Vec<StreamId>,
470}
471
472#[derive(Default, Debug, Clone)]
479pub struct StreamClaimReply {
480 pub ids: Vec<StreamId>,
482}
483
484#[derive(Debug, Clone, Default)]
492pub enum StreamPendingReply {
493 #[default]
495 Empty,
496 Data(StreamPendingData),
498}
499
500impl StreamPendingReply {
501 pub fn count(&self) -> usize {
503 match self {
504 StreamPendingReply::Empty => 0,
505 StreamPendingReply::Data(x) => x.count,
506 }
507 }
508}
509
510#[derive(Default, Debug, Clone)]
514pub struct StreamPendingData {
515 pub count: usize,
517 pub start_id: String,
519 pub end_id: String,
521 pub consumers: Vec<StreamInfoConsumer>,
525}
526
527#[derive(Default, Debug, Clone)]
537pub struct StreamPendingCountReply {
538 pub ids: Vec<StreamPendingId>,
542}
543
544#[derive(Default, Debug, Clone)]
553pub struct StreamInfoStreamReply {
554 pub last_generated_id: String,
557 pub radix_tree_keys: usize,
560 pub groups: usize,
562 pub length: usize,
564 pub first_entry: StreamId,
566 pub last_entry: StreamId,
568}
569
570#[derive(Default, Debug, Clone)]
576pub struct StreamInfoConsumersReply {
577 pub consumers: Vec<StreamInfoConsumer>,
579}
580
581#[derive(Default, Debug, Clone)]
589pub struct StreamInfoGroupsReply {
590 pub groups: Vec<StreamInfoGroup>,
592}
593
594#[derive(Default, Debug, Clone)]
599pub struct StreamInfoConsumer {
600 pub name: String,
602 pub pending: usize,
604 pub idle: usize,
606}
607
608#[derive(Default, Debug, Clone)]
613pub struct StreamInfoGroup {
614 pub name: String,
616 pub consumers: usize,
618 pub pending: usize,
620 pub last_delivered_id: String,
622 pub entries_read: Option<usize>,
625 pub lag: Option<usize>,
628}
629
630#[derive(Default, Debug, Clone)]
634pub struct StreamPendingId {
635 pub id: String,
637 pub consumer: String,
641 pub last_delivered_ms: usize,
644 pub times_delivered: usize,
646}
647
648#[derive(Default, Debug, Clone)]
650pub struct StreamKey {
651 pub key: String,
653 pub ids: Vec<StreamId>,
655}
656
657#[derive(Default, Debug, Clone)]
659pub struct StreamId {
660 pub id: String,
662 pub map: HashMap<String, Value>,
664}
665
666impl StreamId {
667 fn from_array_value(v: &Value) -> RedisResult<Self> {
669 let mut stream_id = StreamId::default();
670 if let Value::Array(ref values) = *v {
671 if let Some(v) = values.first() {
672 stream_id.id = from_redis_value(v)?;
673 }
674 if let Some(v) = values.get(1) {
675 stream_id.map = from_redis_value(v)?;
676 }
677 }
678
679 Ok(stream_id)
680 }
681
682 pub fn get<T: FromRedisValue>(&self, key: &str) -> Option<T> {
685 match self.map.get(key) {
686 Some(x) => from_redis_value(x).ok(),
687 None => None,
688 }
689 }
690
691 pub fn contains_key(&self, key: &str) -> bool {
693 self.map.contains_key(key)
694 }
695
696 pub fn len(&self) -> usize {
698 self.map.len()
699 }
700
701 pub fn is_empty(&self) -> bool {
703 self.len() == 0
704 }
705}
706
707type SACRows = Vec<HashMap<String, HashMap<String, Value>>>;
708
709impl FromRedisValue for StreamAutoClaimReply {
710 fn from_redis_value(v: &Value) -> RedisResult<Self> {
711 match *v {
712 Value::Array(ref items) => {
713 if let 2..=3 = items.len() {
714 let deleted_ids = if let Some(o) = items.get(2) {
715 from_redis_value(o)?
716 } else {
717 Vec::new()
718 };
719
720 let claimed: Vec<StreamId> = match &items[1] {
721 Value::Array(x)
723 if matches!(x.first(), None | Some(Value::BulkString(_))) =>
724 {
725 let ids: Vec<String> = from_redis_value(&items[1])?;
726
727 ids.into_iter()
728 .map(|id| StreamId {
729 id,
730 ..Default::default()
731 })
732 .collect()
733 }
734 Value::Array(x) if matches!(x.first(), Some(Value::Array(_))) => {
736 let rows: SACRows = from_redis_value(&items[1])?;
737
738 rows.into_iter()
739 .flat_map(|id_row| {
740 id_row.into_iter().map(|(id, map)| StreamId { id, map })
741 })
742 .collect()
743 }
744 _ => invalid_type_error!("Incorrect type", &items[1]),
745 };
746
747 Ok(Self {
748 next_stream_id: from_redis_value(&items[0])?,
749 claimed,
750 deleted_ids,
751 })
752 } else {
753 invalid_type_error!("Wrong number of entries in array response", v)
754 }
755 }
756 _ => invalid_type_error!("Not a array response", v),
757 }
758 }
759}
760
761type SRRows = Vec<HashMap<String, Vec<HashMap<String, HashMap<String, Value>>>>>;
762impl FromRedisValue for StreamReadReply {
763 fn from_redis_value(v: &Value) -> RedisResult<Self> {
764 let rows: SRRows = from_redis_value(v)?;
765 let keys = rows
766 .into_iter()
767 .flat_map(|row| {
768 row.into_iter().map(|(key, entry)| {
769 let ids = entry
770 .into_iter()
771 .flat_map(|id_row| id_row.into_iter().map(|(id, map)| StreamId { id, map }))
772 .collect();
773 StreamKey { key, ids }
774 })
775 })
776 .collect();
777 Ok(StreamReadReply { keys })
778 }
779}
780
781impl FromRedisValue for StreamRangeReply {
782 fn from_redis_value(v: &Value) -> RedisResult<Self> {
783 let rows: Vec<HashMap<String, HashMap<String, Value>>> = from_redis_value(v)?;
784 let ids: Vec<StreamId> = rows
785 .into_iter()
786 .flat_map(|row| row.into_iter().map(|(id, map)| StreamId { id, map }))
787 .collect();
788 Ok(StreamRangeReply { ids })
789 }
790}
791
792impl FromRedisValue for StreamClaimReply {
793 fn from_redis_value(v: &Value) -> RedisResult<Self> {
794 let rows: Vec<HashMap<String, HashMap<String, Value>>> = from_redis_value(v)?;
795 let ids: Vec<StreamId> = rows
796 .into_iter()
797 .flat_map(|row| row.into_iter().map(|(id, map)| StreamId { id, map }))
798 .collect();
799 Ok(StreamClaimReply { ids })
800 }
801}
802
803type SPRInner = (
804 usize,
805 Option<String>,
806 Option<String>,
807 Vec<Option<(String, String)>>,
808);
809impl FromRedisValue for StreamPendingReply {
810 fn from_redis_value(v: &Value) -> RedisResult<Self> {
811 let (count, start, end, consumer_data): SPRInner = from_redis_value(v)?;
812
813 if count == 0 {
814 Ok(StreamPendingReply::Empty)
815 } else {
816 let mut result = StreamPendingData::default();
817
818 let start_id = start.ok_or_else(|| {
819 Error::new(
820 ErrorKind::Other,
821 "IllegalState: Non-zero pending expects start id",
822 )
823 })?;
824
825 let end_id = end.ok_or_else(|| {
826 Error::new(
827 ErrorKind::Other,
828 "IllegalState: Non-zero pending expects end id",
829 )
830 })?;
831
832 result.count = count;
833 result.start_id = start_id;
834 result.end_id = end_id;
835
836 result.consumers = consumer_data
837 .into_iter()
838 .flatten()
839 .map(|(name, pending)| StreamInfoConsumer {
840 name,
841 pending: pending.parse().unwrap_or_default(),
842 ..Default::default()
843 })
844 .collect();
845
846 Ok(StreamPendingReply::Data(result))
847 }
848 }
849}
850
851impl FromRedisValue for StreamPendingCountReply {
852 fn from_redis_value(v: &Value) -> RedisResult<Self> {
853 let mut reply = StreamPendingCountReply::default();
854 match v {
855 Value::Array(outer_tuple) => {
856 for outer in outer_tuple {
857 match outer {
858 Value::Array(inner_tuple) => match &inner_tuple[..] {
859 [Value::BulkString(id_bytes), Value::BulkString(consumer_bytes), Value::Int(last_delivered_ms_u64), Value::Int(times_delivered_u64)] =>
860 {
861 let id = String::from_utf8(id_bytes.to_vec())?;
862 let consumer = String::from_utf8(consumer_bytes.to_vec())?;
863 let last_delivered_ms = *last_delivered_ms_u64 as usize;
864 let times_delivered = *times_delivered_u64 as usize;
865 reply.ids.push(StreamPendingId {
866 id,
867 consumer,
868 last_delivered_ms,
869 times_delivered,
870 });
871 }
872 _ => fail!((
873 crate::types::ErrorKind::TypeError,
874 "Cannot parse redis data (3)"
875 )),
876 },
877 _ => fail!((
878 crate::types::ErrorKind::TypeError,
879 "Cannot parse redis data (2)"
880 )),
881 }
882 }
883 }
884 _ => fail!((
885 crate::types::ErrorKind::TypeError,
886 "Cannot parse redis data (1)"
887 )),
888 };
889 Ok(reply)
890 }
891}
892
893impl FromRedisValue for StreamInfoStreamReply {
894 fn from_redis_value(v: &Value) -> RedisResult<Self> {
895 let map: HashMap<String, Value> = from_redis_value(v)?;
896 let mut reply = StreamInfoStreamReply::default();
897 if let Some(v) = &map.get("last-generated-id") {
898 reply.last_generated_id = from_redis_value(v)?;
899 }
900 if let Some(v) = &map.get("radix-tree-nodes") {
901 reply.radix_tree_keys = from_redis_value(v)?;
902 }
903 if let Some(v) = &map.get("groups") {
904 reply.groups = from_redis_value(v)?;
905 }
906 if let Some(v) = &map.get("length") {
907 reply.length = from_redis_value(v)?;
908 }
909 if let Some(v) = &map.get("first-entry") {
910 reply.first_entry = StreamId::from_array_value(v)?;
911 }
912 if let Some(v) = &map.get("last-entry") {
913 reply.last_entry = StreamId::from_array_value(v)?;
914 }
915 Ok(reply)
916 }
917}
918
919impl FromRedisValue for StreamInfoConsumersReply {
920 fn from_redis_value(v: &Value) -> RedisResult<Self> {
921 let consumers: Vec<HashMap<String, Value>> = from_redis_value(v)?;
922 let mut reply = StreamInfoConsumersReply::default();
923 for map in consumers {
924 let mut c = StreamInfoConsumer::default();
925 if let Some(v) = &map.get("name") {
926 c.name = from_redis_value(v)?;
927 }
928 if let Some(v) = &map.get("pending") {
929 c.pending = from_redis_value(v)?;
930 }
931 if let Some(v) = &map.get("idle") {
932 c.idle = from_redis_value(v)?;
933 }
934 reply.consumers.push(c);
935 }
936
937 Ok(reply)
938 }
939}
940
941impl FromRedisValue for StreamInfoGroupsReply {
942 fn from_redis_value(v: &Value) -> RedisResult<Self> {
943 let groups: Vec<HashMap<String, Value>> = from_redis_value(v)?;
944 let mut reply = StreamInfoGroupsReply::default();
945 for map in groups {
946 let mut g = StreamInfoGroup::default();
947 if let Some(v) = &map.get("name") {
948 g.name = from_redis_value(v)?;
949 }
950 if let Some(v) = &map.get("pending") {
951 g.pending = from_redis_value(v)?;
952 }
953 if let Some(v) = &map.get("consumers") {
954 g.consumers = from_redis_value(v)?;
955 }
956 if let Some(v) = &map.get("last-delivered-id") {
957 g.last_delivered_id = from_redis_value(v)?;
958 }
959 if let Some(v) = &map.get("entries-read") {
960 g.entries_read = if let Value::Nil = v {
961 None
962 } else {
963 Some(from_redis_value(v)?)
964 };
965 }
966 if let Some(v) = &map.get("lag") {
967 g.lag = if let Value::Nil = v {
968 None
969 } else {
970 Some(from_redis_value(v)?)
971 };
972 }
973 reply.groups.push(g);
974 }
975 Ok(reply)
976 }
977}
978
979#[cfg(test)]
980mod tests {
981 use super::*;
982
983 fn assert_command_eq(object: impl ToRedisArgs, expected: &[u8]) {
984 let mut out: Vec<Vec<u8>> = Vec::new();
985
986 object.write_redis_args(&mut out);
987
988 let mut cmd: Vec<u8> = Vec::new();
989
990 out.iter_mut().for_each(|item| {
991 cmd.append(item);
992 cmd.push(b' ');
993 });
994
995 cmd.pop();
996
997 assert_eq!(cmd, expected);
998 }
999
1000 mod stream_auto_claim_reply {
1001 use super::*;
1002 use crate::Value;
1003
1004 #[test]
1005 fn short_response() {
1006 let value = Value::Array(vec![Value::BulkString("1713465536578-0".into())]);
1007
1008 let reply: RedisResult<StreamAutoClaimReply> = FromRedisValue::from_redis_value(&value);
1009
1010 assert!(reply.is_err());
1011 }
1012
1013 #[test]
1014 fn parses_none_claimed_response() {
1015 let value = Value::Array(vec![
1016 Value::BulkString("0-0".into()),
1017 Value::Array(vec![]),
1018 Value::Array(vec![]),
1019 ]);
1020
1021 let reply: RedisResult<StreamAutoClaimReply> = FromRedisValue::from_redis_value(&value);
1022
1023 assert!(reply.is_ok());
1024
1025 let reply = reply.unwrap();
1026
1027 assert_eq!(reply.next_stream_id.as_str(), "0-0");
1028 assert_eq!(reply.claimed.len(), 0);
1029 assert_eq!(reply.deleted_ids.len(), 0);
1030 }
1031
1032 #[test]
1033 fn parses_response() {
1034 let value = Value::Array(vec![
1035 Value::BulkString("1713465536578-0".into()),
1036 Value::Array(vec![
1037 Value::Array(vec![
1038 Value::BulkString("1713465533411-0".into()),
1039 Value::Array(vec![
1041 Value::BulkString("name".into()),
1042 Value::BulkString("test".into()),
1043 Value::BulkString("other".into()),
1044 Value::BulkString("whaterver".into()),
1045 ]),
1046 ]),
1047 Value::Array(vec![
1048 Value::BulkString("1713465536069-0".into()),
1049 Value::Array(vec![
1050 Value::BulkString("name".into()),
1051 Value::BulkString("another test".into()),
1052 Value::BulkString("other".into()),
1053 Value::BulkString("something".into()),
1054 ]),
1055 ]),
1056 ]),
1057 Value::Array(vec![Value::BulkString("123456789-0".into())]),
1058 ]);
1059
1060 let reply: RedisResult<StreamAutoClaimReply> = FromRedisValue::from_redis_value(&value);
1061
1062 assert!(reply.is_ok());
1063
1064 let reply = reply.unwrap();
1065
1066 assert_eq!(reply.next_stream_id.as_str(), "1713465536578-0");
1067 assert_eq!(reply.claimed.len(), 2);
1068 assert_eq!(reply.claimed[0].id.as_str(), "1713465533411-0");
1069 assert!(
1070 matches!(reply.claimed[0].map.get("name"), Some(Value::BulkString(v)) if v == "test".as_bytes())
1071 );
1072 assert_eq!(reply.claimed[1].id.as_str(), "1713465536069-0");
1073 assert_eq!(reply.deleted_ids.len(), 1);
1074 assert!(reply.deleted_ids.contains(&"123456789-0".to_string()))
1075 }
1076
1077 #[test]
1078 fn parses_v6_response() {
1079 let value = Value::Array(vec![
1080 Value::BulkString("1713465536578-0".into()),
1081 Value::Array(vec![
1082 Value::Array(vec![
1083 Value::BulkString("1713465533411-0".into()),
1084 Value::Array(vec![
1085 Value::BulkString("name".into()),
1086 Value::BulkString("test".into()),
1087 Value::BulkString("other".into()),
1088 Value::BulkString("whaterver".into()),
1089 ]),
1090 ]),
1091 Value::Array(vec![
1092 Value::BulkString("1713465536069-0".into()),
1093 Value::Array(vec![
1094 Value::BulkString("name".into()),
1095 Value::BulkString("another test".into()),
1096 Value::BulkString("other".into()),
1097 Value::BulkString("something".into()),
1098 ]),
1099 ]),
1100 ]),
1101 ]);
1103
1104 let reply: RedisResult<StreamAutoClaimReply> = FromRedisValue::from_redis_value(&value);
1105
1106 assert!(reply.is_ok());
1107
1108 let reply = reply.unwrap();
1109
1110 assert_eq!(reply.next_stream_id.as_str(), "1713465536578-0");
1111 assert_eq!(reply.claimed.len(), 2);
1112 let ids: Vec<_> = reply.claimed.iter().map(|e| e.id.as_str()).collect();
1113 assert!(ids.contains(&"1713465533411-0"));
1114 assert!(ids.contains(&"1713465536069-0"));
1115 assert_eq!(reply.deleted_ids.len(), 0);
1116 }
1117
1118 #[test]
1119 fn parses_justid_response() {
1120 let value = Value::Array(vec![
1121 Value::BulkString("1713465536578-0".into()),
1122 Value::Array(vec![
1123 Value::BulkString("1713465533411-0".into()),
1124 Value::BulkString("1713465536069-0".into()),
1125 ]),
1126 Value::Array(vec![Value::BulkString("123456789-0".into())]),
1127 ]);
1128
1129 let reply: RedisResult<StreamAutoClaimReply> = FromRedisValue::from_redis_value(&value);
1130
1131 assert!(reply.is_ok());
1132
1133 let reply = reply.unwrap();
1134
1135 assert_eq!(reply.next_stream_id.as_str(), "1713465536578-0");
1136 assert_eq!(reply.claimed.len(), 2);
1137 let ids: Vec<_> = reply.claimed.iter().map(|e| e.id.as_str()).collect();
1138 assert!(ids.contains(&"1713465533411-0"));
1139 assert!(ids.contains(&"1713465536069-0"));
1140 assert_eq!(reply.deleted_ids.len(), 1);
1141 assert!(reply.deleted_ids.contains(&"123456789-0".to_string()))
1142 }
1143
1144 #[test]
1145 fn parses_v6_justid_response() {
1146 let value = Value::Array(vec![
1147 Value::BulkString("1713465536578-0".into()),
1148 Value::Array(vec![
1149 Value::BulkString("1713465533411-0".into()),
1150 Value::BulkString("1713465536069-0".into()),
1151 ]),
1152 ]);
1154
1155 let reply: RedisResult<StreamAutoClaimReply> = FromRedisValue::from_redis_value(&value);
1156
1157 assert!(reply.is_ok());
1158
1159 let reply = reply.unwrap();
1160
1161 assert_eq!(reply.next_stream_id.as_str(), "1713465536578-0");
1162 assert_eq!(reply.claimed.len(), 2);
1163 let ids: Vec<_> = reply.claimed.iter().map(|e| e.id.as_str()).collect();
1164 assert!(ids.contains(&"1713465533411-0"));
1165 assert!(ids.contains(&"1713465536069-0"));
1166 assert_eq!(reply.deleted_ids.len(), 0);
1167 }
1168 }
1169
1170 mod stream_trim_options {
1171 use super::*;
1172
1173 #[test]
1174 fn maxlen_trim() {
1175 let options = StreamTrimOptions::maxlen(StreamTrimmingMode::Approx, 10);
1176
1177 assert_command_eq(options, b"MAXLEN ~ 10");
1178 }
1179
1180 #[test]
1181 fn maxlen_exact_trim() {
1182 let options = StreamTrimOptions::maxlen(StreamTrimmingMode::Exact, 10);
1183
1184 assert_command_eq(options, b"MAXLEN = 10");
1185 }
1186
1187 #[test]
1188 fn maxlen_trim_limit() {
1189 let options = StreamTrimOptions::maxlen(StreamTrimmingMode::Approx, 10).limit(5);
1190
1191 assert_command_eq(options, b"MAXLEN ~ 10 LIMIT 5");
1192 }
1193 #[test]
1194 fn minid_trim_limit() {
1195 let options = StreamTrimOptions::minid(StreamTrimmingMode::Exact, "123456-7").limit(5);
1196
1197 assert_command_eq(options, b"MINID = 123456-7 LIMIT 5");
1198 }
1199 }
1200
1201 mod stream_add_options {
1202 use super::*;
1203
1204 #[test]
1205 fn the_default() {
1206 let options = StreamAddOptions::default();
1207
1208 assert_command_eq(options, b"");
1209 }
1210
1211 #[test]
1212 fn with_maxlen_trim() {
1213 let options = StreamAddOptions::default()
1214 .trim(StreamTrimStrategy::maxlen(StreamTrimmingMode::Exact, 10));
1215
1216 assert_command_eq(options, b"MAXLEN = 10");
1217 }
1218
1219 #[test]
1220 fn with_nomkstream() {
1221 let options = StreamAddOptions::default().nomkstream();
1222
1223 assert_command_eq(options, b"NOMKSTREAM");
1224 }
1225
1226 #[test]
1227 fn with_nomkstream_and_maxlen_trim() {
1228 let options = StreamAddOptions::default()
1229 .nomkstream()
1230 .trim(StreamTrimStrategy::maxlen(StreamTrimmingMode::Exact, 10));
1231
1232 assert_command_eq(options, b"NOMKSTREAM MAXLEN = 10");
1233 }
1234 }
1235}