1use crate::{
4 from_redis_value, types::HashMap, ErrorKind, FromRedisValue, RedisError, RedisResult,
5 RedisWrite, ToRedisArgs, Value,
6};
7
8use std::io::Error;
9
10macro_rules! invalid_type_error {
11 ($v:expr, $det:expr) => {{
12 fail!((
13 $crate::ErrorKind::TypeError,
14 "Response was of incompatible type",
15 format!("{:?} (response was {:?})", $det, $v)
16 ));
17 }};
18}
19
20#[derive(PartialEq, Eq, Clone, Debug, Copy)]
26pub enum StreamMaxlen {
27 Equals(usize),
29 Approx(usize),
31}
32
33impl ToRedisArgs for StreamMaxlen {
34 fn write_redis_args<W>(&self, out: &mut W)
35 where
36 W: ?Sized + RedisWrite,
37 {
38 let (ch, val) = match *self {
39 StreamMaxlen::Equals(v) => ("=", v),
40 StreamMaxlen::Approx(v) => ("~", v),
41 };
42 out.write_arg(b"MAXLEN");
43 out.write_arg(ch.as_bytes());
44 val.write_redis_args(out);
45 }
46}
47
48#[derive(Debug)]
51pub enum StreamTrimmingMode {
52 Exact,
54 Approx,
56}
57
58impl ToRedisArgs for StreamTrimmingMode {
59 fn write_redis_args<W>(&self, out: &mut W)
60 where
61 W: ?Sized + RedisWrite,
62 {
63 match self {
64 Self::Exact => out.write_arg(b"="),
65 Self::Approx => out.write_arg(b"~"),
66 };
67 }
68}
69
70#[derive(Debug)]
74pub enum StreamTrimStrategy {
75 MaxLen(StreamTrimmingMode, usize, Option<usize>),
77 MinId(StreamTrimmingMode, String, Option<usize>),
79}
80
81impl StreamTrimStrategy {
82 pub fn maxlen(trim: StreamTrimmingMode, max_entries: usize) -> Self {
84 Self::MaxLen(trim, max_entries, None)
85 }
86
87 pub fn minid(trim: StreamTrimmingMode, stream_id: impl Into<String>) -> Self {
89 Self::MinId(trim, stream_id.into(), None)
90 }
91
92 pub fn limit(self, limit: usize) -> Self {
94 match self {
95 StreamTrimStrategy::MaxLen(m, t, _) => StreamTrimStrategy::MaxLen(m, t, Some(limit)),
96 StreamTrimStrategy::MinId(m, t, _) => StreamTrimStrategy::MinId(m, t, Some(limit)),
97 }
98 }
99}
100
101impl ToRedisArgs for StreamTrimStrategy {
102 fn write_redis_args<W>(&self, out: &mut W)
103 where
104 W: ?Sized + RedisWrite,
105 {
106 let limit = match self {
107 StreamTrimStrategy::MaxLen(m, t, limit) => {
108 out.write_arg(b"MAXLEN");
109 m.write_redis_args(out);
110 t.write_redis_args(out);
111 limit
112 }
113 StreamTrimStrategy::MinId(m, t, limit) => {
114 out.write_arg(b"MINID");
115 m.write_redis_args(out);
116 t.write_redis_args(out);
117 limit
118 }
119 };
120 if let Some(limit) = limit {
121 out.write_arg(b"LIMIT");
122 limit.write_redis_args(out);
123 }
124 }
125}
126
127#[derive(Debug)]
132pub struct StreamTrimOptions {
133 strategy: StreamTrimStrategy,
134 deletion_policy: Option<StreamDeletionPolicy>,
135}
136
137impl StreamTrimOptions {
138 pub fn maxlen(mode: StreamTrimmingMode, max_entries: usize) -> Self {
140 Self {
141 strategy: StreamTrimStrategy::maxlen(mode, max_entries),
142 deletion_policy: None,
143 }
144 }
145
146 pub fn minid(mode: StreamTrimmingMode, stream_id: impl Into<String>) -> Self {
148 Self {
149 strategy: StreamTrimStrategy::minid(mode, stream_id),
150 deletion_policy: None,
151 }
152 }
153
154 pub fn limit(mut self, limit: usize) -> Self {
156 self.strategy = self.strategy.limit(limit);
157 self
158 }
159
160 pub fn set_deletion_policy(mut self, deletion_policy: StreamDeletionPolicy) -> Self {
162 self.deletion_policy = Some(deletion_policy);
163 self
164 }
165}
166
167impl ToRedisArgs for StreamTrimOptions {
168 fn write_redis_args<W>(&self, out: &mut W)
169 where
170 W: ?Sized + RedisWrite,
171 {
172 self.strategy.write_redis_args(out);
173 if let Some(deletion_policy) = self.deletion_policy.as_ref() {
174 deletion_policy.write_redis_args(out);
175 }
176 }
177}
178
179#[derive(Default, Debug)]
184pub struct StreamAddOptions {
185 nomkstream: bool,
186 trim: Option<StreamTrimStrategy>,
187 deletion_policy: Option<StreamDeletionPolicy>,
188}
189
190impl StreamAddOptions {
191 pub fn nomkstream(mut self) -> Self {
193 self.nomkstream = true;
194 self
195 }
196
197 pub fn trim(mut self, trim: StreamTrimStrategy) -> Self {
199 self.trim = Some(trim);
200 self
201 }
202
203 pub fn set_deletion_policy(mut self, deletion_policy: StreamDeletionPolicy) -> Self {
205 self.deletion_policy = Some(deletion_policy);
206 self
207 }
208}
209
210impl ToRedisArgs for StreamAddOptions {
211 fn write_redis_args<W>(&self, out: &mut W)
212 where
213 W: ?Sized + RedisWrite,
214 {
215 if self.nomkstream {
216 out.write_arg(b"NOMKSTREAM");
217 }
218 if let Some(strategy) = self.trim.as_ref() {
219 strategy.write_redis_args(out);
220 }
221 if let Some(deletion_policy) = self.deletion_policy.as_ref() {
222 deletion_policy.write_redis_args(out);
223 }
224 }
225}
226
227#[derive(Default, Debug)]
232pub struct StreamAutoClaimOptions {
233 count: Option<usize>,
234 justid: bool,
235}
236
237impl StreamAutoClaimOptions {
238 pub fn count(mut self, n: usize) -> Self {
240 self.count = Some(n);
241 self
242 }
243
244 pub fn with_justid(mut self) -> Self {
247 self.justid = true;
248 self
249 }
250}
251
252impl ToRedisArgs for StreamAutoClaimOptions {
253 fn write_redis_args<W>(&self, out: &mut W)
254 where
255 W: ?Sized + RedisWrite,
256 {
257 if let Some(ref count) = self.count {
258 out.write_arg(b"COUNT");
259 out.write_arg(format!("{count}").as_bytes());
260 }
261 if self.justid {
262 out.write_arg(b"JUSTID");
263 }
264 }
265}
266
267#[derive(Default, Debug)]
272pub struct StreamClaimOptions {
273 idle: Option<usize>,
275 time: Option<usize>,
277 retry: Option<usize>,
279 force: bool,
281 justid: bool,
284 lastid: Option<String>,
286}
287
288impl StreamClaimOptions {
289 pub fn idle(mut self, ms: usize) -> Self {
291 self.idle = Some(ms);
292 self
293 }
294
295 pub fn time(mut self, ms_time: usize) -> Self {
297 self.time = Some(ms_time);
298 self
299 }
300
301 pub fn retry(mut self, count: usize) -> Self {
303 self.retry = Some(count);
304 self
305 }
306
307 pub fn with_force(mut self) -> Self {
309 self.force = true;
310 self
311 }
312
313 pub fn with_justid(mut self) -> Self {
316 self.justid = true;
317 self
318 }
319
320 pub fn with_lastid(mut self, lastid: impl Into<String>) -> Self {
322 self.lastid = Some(lastid.into());
323 self
324 }
325}
326
327impl ToRedisArgs for StreamClaimOptions {
328 fn write_redis_args<W>(&self, out: &mut W)
329 where
330 W: ?Sized + RedisWrite,
331 {
332 if let Some(ref ms) = self.idle {
333 out.write_arg(b"IDLE");
334 out.write_arg(format!("{ms}").as_bytes());
335 }
336 if let Some(ref ms_time) = self.time {
337 out.write_arg(b"TIME");
338 out.write_arg(format!("{ms_time}").as_bytes());
339 }
340 if let Some(ref count) = self.retry {
341 out.write_arg(b"RETRYCOUNT");
342 out.write_arg(format!("{count}").as_bytes());
343 }
344 if self.force {
345 out.write_arg(b"FORCE");
346 }
347 if self.justid {
348 out.write_arg(b"JUSTID");
349 }
350 if let Some(ref lastid) = self.lastid {
351 out.write_arg(b"LASTID");
352 lastid.write_redis_args(out);
353 }
354 }
355}
356
357type SRGroup = Option<(Vec<Vec<u8>>, Vec<Vec<u8>>)>;
361#[derive(Default, Debug)]
366pub struct StreamReadOptions {
367 block: Option<usize>,
369 count: Option<usize>,
371 noack: Option<bool>,
373 group: SRGroup,
376}
377
378impl StreamReadOptions {
379 pub fn read_only(&self) -> bool {
382 self.group.is_none()
383 }
384
385 pub fn noack(mut self) -> Self {
389 self.noack = Some(true);
390 self
391 }
392
393 pub fn block(mut self, ms: usize) -> Self {
395 self.block = Some(ms);
396 self
397 }
398
399 pub fn count(mut self, n: usize) -> Self {
401 self.count = Some(n);
402 self
403 }
404
405 pub fn group<GN: ToRedisArgs, CN: ToRedisArgs>(
407 mut self,
408 group_name: GN,
409 consumer_name: CN,
410 ) -> Self {
411 self.group = Some((
412 ToRedisArgs::to_redis_args(&group_name),
413 ToRedisArgs::to_redis_args(&consumer_name),
414 ));
415 self
416 }
417}
418
419impl ToRedisArgs for StreamReadOptions {
420 fn write_redis_args<W>(&self, out: &mut W)
421 where
422 W: ?Sized + RedisWrite,
423 {
424 if let Some(ref group) = self.group {
425 out.write_arg(b"GROUP");
426 for i in &group.0 {
427 out.write_arg(i);
428 }
429 for i in &group.1 {
430 out.write_arg(i);
431 }
432 }
433
434 if let Some(ref ms) = self.block {
435 out.write_arg(b"BLOCK");
436 out.write_arg(format!("{ms}").as_bytes());
437 }
438
439 if let Some(ref n) = self.count {
440 out.write_arg(b"COUNT");
441 out.write_arg(format!("{n}").as_bytes());
442 }
443
444 if self.group.is_some() {
445 if self.noack == Some(true) {
447 out.write_arg(b"NOACK");
448 }
449 }
450 }
451}
452
453#[derive(Default, Debug, Clone)]
458pub struct StreamAutoClaimReply {
459 pub next_stream_id: String,
461 pub claimed: Vec<StreamId>,
463 pub deleted_ids: Vec<String>,
465}
466
467#[derive(Default, Debug, Clone)]
473pub struct StreamReadReply {
474 pub keys: Vec<StreamKey>,
476}
477
478#[derive(Default, Debug, Clone)]
490pub struct StreamRangeReply {
491 pub ids: Vec<StreamId>,
493}
494
495#[derive(Default, Debug, Clone)]
502pub struct StreamClaimReply {
503 pub ids: Vec<StreamId>,
505}
506
507#[derive(Debug, Clone, Default)]
515pub enum StreamPendingReply {
516 #[default]
518 Empty,
519 Data(StreamPendingData),
521}
522
523impl StreamPendingReply {
524 pub fn count(&self) -> usize {
526 match self {
527 StreamPendingReply::Empty => 0,
528 StreamPendingReply::Data(x) => x.count,
529 }
530 }
531}
532
533#[derive(Default, Debug, Clone)]
537pub struct StreamPendingData {
538 pub count: usize,
540 pub start_id: String,
542 pub end_id: String,
544 pub consumers: Vec<StreamInfoConsumer>,
548}
549
550#[derive(Default, Debug, Clone)]
560pub struct StreamPendingCountReply {
561 pub ids: Vec<StreamPendingId>,
565}
566
567#[derive(Default, Debug, Clone)]
576pub struct StreamInfoStreamReply {
577 pub last_generated_id: String,
580 pub radix_tree_keys: usize,
583 pub groups: usize,
585 pub length: usize,
587 pub first_entry: StreamId,
589 pub last_entry: StreamId,
591}
592
593#[derive(Default, Debug, Clone)]
599pub struct StreamInfoConsumersReply {
600 pub consumers: Vec<StreamInfoConsumer>,
602}
603
604#[derive(Default, Debug, Clone)]
612pub struct StreamInfoGroupsReply {
613 pub groups: Vec<StreamInfoGroup>,
615}
616
617#[derive(Default, Debug, Clone)]
622pub struct StreamInfoConsumer {
623 pub name: String,
625 pub pending: usize,
627 pub idle: usize,
629}
630
631#[derive(Default, Debug, Clone)]
636pub struct StreamInfoGroup {
637 pub name: String,
639 pub consumers: usize,
641 pub pending: usize,
643 pub last_delivered_id: String,
645 pub entries_read: Option<usize>,
648 pub lag: Option<usize>,
651}
652
653#[derive(Default, Debug, Clone)]
657pub struct StreamPendingId {
658 pub id: String,
660 pub consumer: String,
664 pub last_delivered_ms: usize,
667 pub times_delivered: usize,
669}
670
671#[derive(Default, Debug, Clone)]
673pub struct StreamKey {
674 pub key: String,
676 pub ids: Vec<StreamId>,
678}
679
680#[derive(Default, Debug, Clone)]
682pub struct StreamId {
683 pub id: String,
685 pub map: HashMap<String, Value>,
687}
688
689impl StreamId {
690 fn from_array_value(v: &Value) -> RedisResult<Self> {
692 let mut stream_id = StreamId::default();
693 if let Value::Array(ref values) = *v {
694 if let Some(v) = values.first() {
695 stream_id.id = from_redis_value(v)?;
696 }
697 if let Some(v) = values.get(1) {
698 stream_id.map = from_redis_value(v)?;
699 }
700 }
701
702 Ok(stream_id)
703 }
704
705 pub fn get<T: FromRedisValue>(&self, key: &str) -> Option<T> {
708 match self.map.get(key) {
709 Some(x) => from_redis_value(x).ok(),
710 None => None,
711 }
712 }
713
714 pub fn contains_key(&self, key: &str) -> bool {
716 self.map.contains_key(key)
717 }
718
719 pub fn len(&self) -> usize {
721 self.map.len()
722 }
723
724 pub fn is_empty(&self) -> bool {
726 self.len() == 0
727 }
728}
729
730type SACRows = Vec<HashMap<String, HashMap<String, Value>>>;
731
732impl FromRedisValue for StreamAutoClaimReply {
733 fn from_redis_value(v: &Value) -> RedisResult<Self> {
734 match *v {
735 Value::Array(ref items) => {
736 if let 2..=3 = items.len() {
737 let deleted_ids = if let Some(o) = items.get(2) {
738 from_redis_value(o)?
739 } else {
740 Vec::new()
741 };
742
743 let claimed: Vec<StreamId> = match &items[1] {
744 Value::Array(x)
746 if matches!(x.first(), None | Some(Value::BulkString(_))) =>
747 {
748 let ids: Vec<String> = from_redis_value(&items[1])?;
749
750 ids.into_iter()
751 .map(|id| StreamId {
752 id,
753 ..Default::default()
754 })
755 .collect()
756 }
757 Value::Array(x) if matches!(x.first(), Some(Value::Array(_))) => {
759 let rows: SACRows = from_redis_value(&items[1])?;
760
761 rows.into_iter()
762 .flat_map(|id_row| {
763 id_row.into_iter().map(|(id, map)| StreamId { id, map })
764 })
765 .collect()
766 }
767 _ => invalid_type_error!("Incorrect type", &items[1]),
768 };
769
770 Ok(Self {
771 next_stream_id: from_redis_value(&items[0])?,
772 claimed,
773 deleted_ids,
774 })
775 } else {
776 invalid_type_error!("Wrong number of entries in array response", v)
777 }
778 }
779 _ => invalid_type_error!("Not a array response", v),
780 }
781 }
782}
783
784type SRRows = Vec<HashMap<String, Vec<HashMap<String, HashMap<String, Value>>>>>;
785impl FromRedisValue for StreamReadReply {
786 fn from_redis_value(v: &Value) -> RedisResult<Self> {
787 let rows: SRRows = from_redis_value(v)?;
788 let keys = rows
789 .into_iter()
790 .flat_map(|row| {
791 row.into_iter().map(|(key, entry)| {
792 let ids = entry
793 .into_iter()
794 .flat_map(|id_row| id_row.into_iter().map(|(id, map)| StreamId { id, map }))
795 .collect();
796 StreamKey { key, ids }
797 })
798 })
799 .collect();
800 Ok(StreamReadReply { keys })
801 }
802}
803
804impl FromRedisValue for StreamRangeReply {
805 fn from_redis_value(v: &Value) -> RedisResult<Self> {
806 let rows: Vec<HashMap<String, HashMap<String, Value>>> = from_redis_value(v)?;
807 let ids: Vec<StreamId> = rows
808 .into_iter()
809 .flat_map(|row| row.into_iter().map(|(id, map)| StreamId { id, map }))
810 .collect();
811 Ok(StreamRangeReply { ids })
812 }
813}
814
815impl FromRedisValue for StreamClaimReply {
816 fn from_redis_value(v: &Value) -> RedisResult<Self> {
817 let rows: Vec<HashMap<String, HashMap<String, Value>>> = from_redis_value(v)?;
818 let ids: Vec<StreamId> = rows
819 .into_iter()
820 .flat_map(|row| row.into_iter().map(|(id, map)| StreamId { id, map }))
821 .collect();
822 Ok(StreamClaimReply { ids })
823 }
824}
825
826type SPRInner = (
827 usize,
828 Option<String>,
829 Option<String>,
830 Vec<Option<(String, String)>>,
831);
832impl FromRedisValue for StreamPendingReply {
833 fn from_redis_value(v: &Value) -> RedisResult<Self> {
834 let (count, start, end, consumer_data): SPRInner = from_redis_value(v)?;
835
836 if count == 0 {
837 Ok(StreamPendingReply::Empty)
838 } else {
839 let mut result = StreamPendingData::default();
840
841 let start_id = start
842 .ok_or_else(|| Error::other("IllegalState: Non-zero pending expects start id"))?;
843
844 let end_id =
845 end.ok_or_else(|| Error::other("IllegalState: Non-zero pending expects end id"))?;
846
847 result.count = count;
848 result.start_id = start_id;
849 result.end_id = end_id;
850
851 result.consumers = consumer_data
852 .into_iter()
853 .flatten()
854 .map(|(name, pending)| StreamInfoConsumer {
855 name,
856 pending: pending.parse().unwrap_or_default(),
857 ..Default::default()
858 })
859 .collect();
860
861 Ok(StreamPendingReply::Data(result))
862 }
863 }
864}
865
866impl FromRedisValue for StreamPendingCountReply {
867 fn from_redis_value(v: &Value) -> RedisResult<Self> {
868 let mut reply = StreamPendingCountReply::default();
869 match v {
870 Value::Array(outer_tuple) => {
871 for outer in outer_tuple {
872 match outer {
873 Value::Array(inner_tuple) => match &inner_tuple[..] {
874 [Value::BulkString(id_bytes), Value::BulkString(consumer_bytes), Value::Int(last_delivered_ms_u64), Value::Int(times_delivered_u64)] =>
875 {
876 let id = String::from_utf8(id_bytes.to_vec())?;
877 let consumer = String::from_utf8(consumer_bytes.to_vec())?;
878 let last_delivered_ms = *last_delivered_ms_u64 as usize;
879 let times_delivered = *times_delivered_u64 as usize;
880 reply.ids.push(StreamPendingId {
881 id,
882 consumer,
883 last_delivered_ms,
884 times_delivered,
885 });
886 }
887 _ => fail!((
888 crate::types::ErrorKind::TypeError,
889 "Cannot parse redis data (3)"
890 )),
891 },
892 _ => fail!((
893 crate::types::ErrorKind::TypeError,
894 "Cannot parse redis data (2)"
895 )),
896 }
897 }
898 }
899 _ => fail!((
900 crate::types::ErrorKind::TypeError,
901 "Cannot parse redis data (1)"
902 )),
903 };
904 Ok(reply)
905 }
906}
907
908impl FromRedisValue for StreamInfoStreamReply {
909 fn from_redis_value(v: &Value) -> RedisResult<Self> {
910 let map: HashMap<String, Value> = from_redis_value(v)?;
911 let mut reply = StreamInfoStreamReply::default();
912 if let Some(v) = &map.get("last-generated-id") {
913 reply.last_generated_id = from_redis_value(v)?;
914 }
915 if let Some(v) = &map.get("radix-tree-nodes") {
916 reply.radix_tree_keys = from_redis_value(v)?;
917 }
918 if let Some(v) = &map.get("groups") {
919 reply.groups = from_redis_value(v)?;
920 }
921 if let Some(v) = &map.get("length") {
922 reply.length = from_redis_value(v)?;
923 }
924 if let Some(v) = &map.get("first-entry") {
925 reply.first_entry = StreamId::from_array_value(v)?;
926 }
927 if let Some(v) = &map.get("last-entry") {
928 reply.last_entry = StreamId::from_array_value(v)?;
929 }
930 Ok(reply)
931 }
932}
933
934impl FromRedisValue for StreamInfoConsumersReply {
935 fn from_redis_value(v: &Value) -> RedisResult<Self> {
936 let consumers: Vec<HashMap<String, Value>> = from_redis_value(v)?;
937 let mut reply = StreamInfoConsumersReply::default();
938 for map in consumers {
939 let mut c = StreamInfoConsumer::default();
940 if let Some(v) = &map.get("name") {
941 c.name = from_redis_value(v)?;
942 }
943 if let Some(v) = &map.get("pending") {
944 c.pending = from_redis_value(v)?;
945 }
946 if let Some(v) = &map.get("idle") {
947 c.idle = from_redis_value(v)?;
948 }
949 reply.consumers.push(c);
950 }
951
952 Ok(reply)
953 }
954}
955
956impl FromRedisValue for StreamInfoGroupsReply {
957 fn from_redis_value(v: &Value) -> RedisResult<Self> {
958 let groups: Vec<HashMap<String, Value>> = from_redis_value(v)?;
959 let mut reply = StreamInfoGroupsReply::default();
960 for map in groups {
961 let mut g = StreamInfoGroup::default();
962 if let Some(v) = &map.get("name") {
963 g.name = from_redis_value(v)?;
964 }
965 if let Some(v) = &map.get("pending") {
966 g.pending = from_redis_value(v)?;
967 }
968 if let Some(v) = &map.get("consumers") {
969 g.consumers = from_redis_value(v)?;
970 }
971 if let Some(v) = &map.get("last-delivered-id") {
972 g.last_delivered_id = from_redis_value(v)?;
973 }
974 if let Some(v) = &map.get("entries-read") {
975 g.entries_read = if let Value::Nil = v {
976 None
977 } else {
978 Some(from_redis_value(v)?)
979 };
980 }
981 if let Some(v) = &map.get("lag") {
982 g.lag = if let Value::Nil = v {
983 None
984 } else {
985 Some(from_redis_value(v)?)
986 };
987 }
988 reply.groups.push(g);
989 }
990 Ok(reply)
991 }
992}
993
994#[derive(Debug, Clone, Default)]
996pub enum StreamDeletionPolicy {
997 #[default]
999 KeepRef,
1000 DelRef,
1002 Acked,
1004}
1005
1006impl ToRedisArgs for StreamDeletionPolicy {
1007 fn write_redis_args<W>(&self, out: &mut W)
1008 where
1009 W: ?Sized + RedisWrite,
1010 {
1011 match self {
1012 StreamDeletionPolicy::KeepRef => out.write_arg(b"KEEPREF"),
1013 StreamDeletionPolicy::DelRef => out.write_arg(b"DELREF"),
1014 StreamDeletionPolicy::Acked => out.write_arg(b"ACKED"),
1015 }
1016 }
1017}
1018
1019#[cfg(feature = "streams")]
1021#[cfg_attr(docsrs, doc(cfg(feature = "streams")))]
1022#[derive(Debug, PartialEq, Eq)]
1023pub enum XDelExStatusCode {
1024 IdNotFound = -1,
1026 Deleted = 1,
1028 NotDeletedUnacknowledgedOrStillReferenced = 2,
1031}
1032
1033#[cfg(feature = "streams")]
1034impl FromRedisValue for XDelExStatusCode {
1035 fn from_redis_value(v: &Value) -> RedisResult<Self> {
1036 match v {
1037 Value::Int(code) => match *code {
1038 -1 => Ok(XDelExStatusCode::IdNotFound),
1039 1 => Ok(XDelExStatusCode::Deleted),
1040 2 => Ok(XDelExStatusCode::NotDeletedUnacknowledgedOrStillReferenced),
1041 _ => Err(RedisError::from((
1042 ErrorKind::TypeError,
1043 "Invalid XDelExStatusCode status code",
1044 format!("Unknown status code: {code}"),
1045 ))),
1046 },
1047 _ => Err(RedisError::from((
1048 ErrorKind::TypeError,
1049 "Response type not XDelExStatusCode compatible",
1050 ))),
1051 }
1052 }
1053}
1054
1055#[cfg(feature = "streams")]
1057#[cfg_attr(docsrs, doc(cfg(feature = "streams")))]
1058#[derive(Debug, PartialEq, Eq)]
1059pub enum XAckDelStatusCode {
1060 IdNotFound = -1,
1062 AcknowledgedAndDeleted = 1,
1064 AcknowledgedNotDeletedStillReferenced = 2,
1066}
1067
1068#[cfg(feature = "streams")]
1069impl FromRedisValue for XAckDelStatusCode {
1070 fn from_redis_value(v: &Value) -> RedisResult<Self> {
1071 match v {
1072 Value::Int(code) => match *code {
1073 -1 => Ok(XAckDelStatusCode::IdNotFound),
1074 1 => Ok(XAckDelStatusCode::AcknowledgedAndDeleted),
1075 2 => Ok(XAckDelStatusCode::AcknowledgedNotDeletedStillReferenced),
1076 _ => Err(RedisError::from((
1077 ErrorKind::TypeError,
1078 "Invalid XAckDelStatusCode status code",
1079 format!("Unknown status code: {code}"),
1080 ))),
1081 },
1082 _ => Err(RedisError::from((
1083 ErrorKind::TypeError,
1084 "Response type not XAckDelStatusCode compatible",
1085 ))),
1086 }
1087 }
1088}
1089
1090#[cfg(test)]
1091mod tests {
1092 use super::*;
1093
1094 fn assert_command_eq(object: impl ToRedisArgs, expected: &[u8]) {
1095 let mut out: Vec<Vec<u8>> = Vec::new();
1096
1097 object.write_redis_args(&mut out);
1098
1099 let mut cmd: Vec<u8> = Vec::new();
1100
1101 out.iter_mut().for_each(|item| {
1102 cmd.append(item);
1103 cmd.push(b' ');
1104 });
1105
1106 cmd.pop();
1107
1108 assert_eq!(cmd, expected);
1109 }
1110
1111 mod stream_auto_claim_reply {
1112 use super::*;
1113 use crate::Value;
1114
1115 #[test]
1116 fn short_response() {
1117 let value = Value::Array(vec![Value::BulkString("1713465536578-0".into())]);
1118
1119 let reply: RedisResult<StreamAutoClaimReply> = FromRedisValue::from_redis_value(&value);
1120
1121 assert!(reply.is_err());
1122 }
1123
1124 #[test]
1125 fn parses_none_claimed_response() {
1126 let value = Value::Array(vec![
1127 Value::BulkString("0-0".into()),
1128 Value::Array(vec![]),
1129 Value::Array(vec![]),
1130 ]);
1131
1132 let reply: RedisResult<StreamAutoClaimReply> = FromRedisValue::from_redis_value(&value);
1133
1134 assert!(reply.is_ok());
1135
1136 let reply = reply.unwrap();
1137
1138 assert_eq!(reply.next_stream_id.as_str(), "0-0");
1139 assert_eq!(reply.claimed.len(), 0);
1140 assert_eq!(reply.deleted_ids.len(), 0);
1141 }
1142
1143 #[test]
1144 fn parses_response() {
1145 let value = Value::Array(vec![
1146 Value::BulkString("1713465536578-0".into()),
1147 Value::Array(vec![
1148 Value::Array(vec![
1149 Value::BulkString("1713465533411-0".into()),
1150 Value::Array(vec![
1152 Value::BulkString("name".into()),
1153 Value::BulkString("test".into()),
1154 Value::BulkString("other".into()),
1155 Value::BulkString("whaterver".into()),
1156 ]),
1157 ]),
1158 Value::Array(vec![
1159 Value::BulkString("1713465536069-0".into()),
1160 Value::Array(vec![
1161 Value::BulkString("name".into()),
1162 Value::BulkString("another test".into()),
1163 Value::BulkString("other".into()),
1164 Value::BulkString("something".into()),
1165 ]),
1166 ]),
1167 ]),
1168 Value::Array(vec![Value::BulkString("123456789-0".into())]),
1169 ]);
1170
1171 let reply: RedisResult<StreamAutoClaimReply> = FromRedisValue::from_redis_value(&value);
1172
1173 assert!(reply.is_ok());
1174
1175 let reply = reply.unwrap();
1176
1177 assert_eq!(reply.next_stream_id.as_str(), "1713465536578-0");
1178 assert_eq!(reply.claimed.len(), 2);
1179 assert_eq!(reply.claimed[0].id.as_str(), "1713465533411-0");
1180 assert!(
1181 matches!(reply.claimed[0].map.get("name"), Some(Value::BulkString(v)) if v == "test".as_bytes())
1182 );
1183 assert_eq!(reply.claimed[1].id.as_str(), "1713465536069-0");
1184 assert_eq!(reply.deleted_ids.len(), 1);
1185 assert!(reply.deleted_ids.contains(&"123456789-0".to_string()))
1186 }
1187
1188 #[test]
1189 fn parses_v6_response() {
1190 let value = Value::Array(vec![
1191 Value::BulkString("1713465536578-0".into()),
1192 Value::Array(vec![
1193 Value::Array(vec![
1194 Value::BulkString("1713465533411-0".into()),
1195 Value::Array(vec![
1196 Value::BulkString("name".into()),
1197 Value::BulkString("test".into()),
1198 Value::BulkString("other".into()),
1199 Value::BulkString("whaterver".into()),
1200 ]),
1201 ]),
1202 Value::Array(vec![
1203 Value::BulkString("1713465536069-0".into()),
1204 Value::Array(vec![
1205 Value::BulkString("name".into()),
1206 Value::BulkString("another test".into()),
1207 Value::BulkString("other".into()),
1208 Value::BulkString("something".into()),
1209 ]),
1210 ]),
1211 ]),
1212 ]);
1214
1215 let reply: RedisResult<StreamAutoClaimReply> = FromRedisValue::from_redis_value(&value);
1216
1217 assert!(reply.is_ok());
1218
1219 let reply = reply.unwrap();
1220
1221 assert_eq!(reply.next_stream_id.as_str(), "1713465536578-0");
1222 assert_eq!(reply.claimed.len(), 2);
1223 let ids: Vec<_> = reply.claimed.iter().map(|e| e.id.as_str()).collect();
1224 assert!(ids.contains(&"1713465533411-0"));
1225 assert!(ids.contains(&"1713465536069-0"));
1226 assert_eq!(reply.deleted_ids.len(), 0);
1227 }
1228
1229 #[test]
1230 fn parses_justid_response() {
1231 let value = Value::Array(vec![
1232 Value::BulkString("1713465536578-0".into()),
1233 Value::Array(vec![
1234 Value::BulkString("1713465533411-0".into()),
1235 Value::BulkString("1713465536069-0".into()),
1236 ]),
1237 Value::Array(vec![Value::BulkString("123456789-0".into())]),
1238 ]);
1239
1240 let reply: RedisResult<StreamAutoClaimReply> = FromRedisValue::from_redis_value(&value);
1241
1242 assert!(reply.is_ok());
1243
1244 let reply = reply.unwrap();
1245
1246 assert_eq!(reply.next_stream_id.as_str(), "1713465536578-0");
1247 assert_eq!(reply.claimed.len(), 2);
1248 let ids: Vec<_> = reply.claimed.iter().map(|e| e.id.as_str()).collect();
1249 assert!(ids.contains(&"1713465533411-0"));
1250 assert!(ids.contains(&"1713465536069-0"));
1251 assert_eq!(reply.deleted_ids.len(), 1);
1252 assert!(reply.deleted_ids.contains(&"123456789-0".to_string()))
1253 }
1254
1255 #[test]
1256 fn parses_v6_justid_response() {
1257 let value = Value::Array(vec![
1258 Value::BulkString("1713465536578-0".into()),
1259 Value::Array(vec![
1260 Value::BulkString("1713465533411-0".into()),
1261 Value::BulkString("1713465536069-0".into()),
1262 ]),
1263 ]);
1265
1266 let reply: RedisResult<StreamAutoClaimReply> = FromRedisValue::from_redis_value(&value);
1267
1268 assert!(reply.is_ok());
1269
1270 let reply = reply.unwrap();
1271
1272 assert_eq!(reply.next_stream_id.as_str(), "1713465536578-0");
1273 assert_eq!(reply.claimed.len(), 2);
1274 let ids: Vec<_> = reply.claimed.iter().map(|e| e.id.as_str()).collect();
1275 assert!(ids.contains(&"1713465533411-0"));
1276 assert!(ids.contains(&"1713465536069-0"));
1277 assert_eq!(reply.deleted_ids.len(), 0);
1278 }
1279 }
1280
1281 mod stream_trim_options {
1282 use super::*;
1283
1284 #[test]
1285 fn maxlen_trim() {
1286 let options = StreamTrimOptions::maxlen(StreamTrimmingMode::Approx, 10);
1287
1288 assert_command_eq(options, b"MAXLEN ~ 10");
1289 }
1290
1291 #[test]
1292 fn maxlen_exact_trim() {
1293 let options = StreamTrimOptions::maxlen(StreamTrimmingMode::Exact, 10);
1294
1295 assert_command_eq(options, b"MAXLEN = 10");
1296 }
1297
1298 #[test]
1299 fn maxlen_trim_limit() {
1300 let options = StreamTrimOptions::maxlen(StreamTrimmingMode::Approx, 10).limit(5);
1301
1302 assert_command_eq(options, b"MAXLEN ~ 10 LIMIT 5");
1303 }
1304 #[test]
1305 fn minid_trim_limit() {
1306 let options = StreamTrimOptions::minid(StreamTrimmingMode::Exact, "123456-7").limit(5);
1307
1308 assert_command_eq(options, b"MINID = 123456-7 LIMIT 5");
1309 }
1310 }
1311
1312 mod stream_add_options {
1313 use super::*;
1314
1315 #[test]
1316 fn the_default() {
1317 let options = StreamAddOptions::default();
1318
1319 assert_command_eq(options, b"");
1320 }
1321
1322 #[test]
1323 fn with_maxlen_trim() {
1324 let options = StreamAddOptions::default()
1325 .trim(StreamTrimStrategy::maxlen(StreamTrimmingMode::Exact, 10));
1326
1327 assert_command_eq(options, b"MAXLEN = 10");
1328 }
1329
1330 #[test]
1331 fn with_nomkstream() {
1332 let options = StreamAddOptions::default().nomkstream();
1333
1334 assert_command_eq(options, b"NOMKSTREAM");
1335 }
1336
1337 #[test]
1338 fn with_nomkstream_and_maxlen_trim() {
1339 let options = StreamAddOptions::default()
1340 .nomkstream()
1341 .trim(StreamTrimStrategy::maxlen(StreamTrimmingMode::Exact, 10));
1342
1343 assert_command_eq(options, b"NOMKSTREAM MAXLEN = 10");
1344 }
1345 }
1346}