1#[cfg(feature = "streams")]
4use crate::{
5 FromRedisValue, RedisWrite, ToRedisArgs, Value,
6 errors::{ParsingError, invalid_type_error},
7 types::HashMap,
8};
9use crate::{from_redis_value, from_redis_value_ref, types::ToSingleRedisArg};
10
11#[derive(PartialEq, Eq, Clone, Debug, Copy)]
17#[non_exhaustive]
18pub enum StreamMaxlen {
19 Equals(usize),
21 Approx(usize),
23}
24
25impl ToRedisArgs for StreamMaxlen {
26 fn write_redis_args<W>(&self, out: &mut W)
27 where
28 W: ?Sized + RedisWrite,
29 {
30 let (ch, val) = match *self {
31 StreamMaxlen::Equals(v) => ("=", v),
32 StreamMaxlen::Approx(v) => ("~", v),
33 };
34 out.write_arg(b"MAXLEN");
35 out.write_arg(ch.as_bytes());
36 val.write_redis_args(out);
37 }
38}
39
40#[derive(Debug)]
43#[non_exhaustive]
44pub enum StreamTrimmingMode {
45 Exact,
47 Approx,
49}
50
51impl ToRedisArgs for StreamTrimmingMode {
52 fn write_redis_args<W>(&self, out: &mut W)
53 where
54 W: ?Sized + RedisWrite,
55 {
56 match self {
57 Self::Exact => out.write_arg(b"="),
58 Self::Approx => out.write_arg(b"~"),
59 };
60 }
61}
62
63#[derive(Debug)]
67#[non_exhaustive]
68pub enum StreamTrimStrategy {
69 MaxLen(StreamTrimmingMode, usize, Option<usize>),
71 MinId(StreamTrimmingMode, String, Option<usize>),
73}
74
75impl StreamTrimStrategy {
76 pub fn maxlen(trim: StreamTrimmingMode, max_entries: usize) -> Self {
78 Self::MaxLen(trim, max_entries, None)
79 }
80
81 pub fn minid(trim: StreamTrimmingMode, stream_id: impl Into<String>) -> Self {
83 Self::MinId(trim, stream_id.into(), None)
84 }
85
86 pub fn limit(self, limit: usize) -> Self {
88 match self {
89 StreamTrimStrategy::MaxLen(m, t, _) => StreamTrimStrategy::MaxLen(m, t, Some(limit)),
90 StreamTrimStrategy::MinId(m, t, _) => StreamTrimStrategy::MinId(m, t, Some(limit)),
91 }
92 }
93}
94
95impl ToRedisArgs for StreamTrimStrategy {
96 fn write_redis_args<W>(&self, out: &mut W)
97 where
98 W: ?Sized + RedisWrite,
99 {
100 let limit = match self {
101 StreamTrimStrategy::MaxLen(m, t, limit) => {
102 out.write_arg(b"MAXLEN");
103 m.write_redis_args(out);
104 t.write_redis_args(out);
105 limit
106 }
107 StreamTrimStrategy::MinId(m, t, limit) => {
108 out.write_arg(b"MINID");
109 m.write_redis_args(out);
110 t.write_redis_args(out);
111 limit
112 }
113 };
114 if let Some(limit) = limit {
115 out.write_arg(b"LIMIT");
116 limit.write_redis_args(out);
117 }
118 }
119}
120
121#[derive(Debug)]
126pub struct StreamTrimOptions {
127 strategy: StreamTrimStrategy,
128 deletion_policy: Option<StreamDeletionPolicy>,
129}
130
131impl StreamTrimOptions {
132 pub fn maxlen(mode: StreamTrimmingMode, max_entries: usize) -> Self {
134 Self {
135 strategy: StreamTrimStrategy::maxlen(mode, max_entries),
136 deletion_policy: None,
137 }
138 }
139
140 pub fn minid(mode: StreamTrimmingMode, stream_id: impl Into<String>) -> Self {
142 Self {
143 strategy: StreamTrimStrategy::minid(mode, stream_id),
144 deletion_policy: None,
145 }
146 }
147
148 pub fn limit(mut self, limit: usize) -> Self {
150 self.strategy = self.strategy.limit(limit);
151 self
152 }
153
154 pub fn set_deletion_policy(mut self, deletion_policy: StreamDeletionPolicy) -> Self {
156 self.deletion_policy = Some(deletion_policy);
157 self
158 }
159}
160
161impl ToRedisArgs for StreamTrimOptions {
162 fn write_redis_args<W>(&self, out: &mut W)
163 where
164 W: ?Sized + RedisWrite,
165 {
166 self.strategy.write_redis_args(out);
167 if let Some(deletion_policy) = self.deletion_policy.as_ref() {
168 deletion_policy.write_redis_args(out);
169 }
170 }
171}
172
173#[derive(Default, Debug)]
178pub struct StreamAddOptions {
179 nomkstream: bool,
180 trim: Option<StreamTrimStrategy>,
181 deletion_policy: Option<StreamDeletionPolicy>,
182}
183
184impl StreamAddOptions {
185 pub fn nomkstream(mut self) -> Self {
187 self.nomkstream = true;
188 self
189 }
190
191 pub fn trim(mut self, trim: StreamTrimStrategy) -> Self {
193 self.trim = Some(trim);
194 self
195 }
196
197 pub fn set_deletion_policy(mut self, deletion_policy: StreamDeletionPolicy) -> Self {
199 self.deletion_policy = Some(deletion_policy);
200 self
201 }
202}
203
204impl ToRedisArgs for StreamAddOptions {
205 fn write_redis_args<W>(&self, out: &mut W)
206 where
207 W: ?Sized + RedisWrite,
208 {
209 if self.nomkstream {
210 out.write_arg(b"NOMKSTREAM");
211 }
212 if let Some(strategy) = self.trim.as_ref() {
213 strategy.write_redis_args(out);
214 }
215 if let Some(deletion_policy) = self.deletion_policy.as_ref() {
216 deletion_policy.write_redis_args(out);
217 }
218 }
219}
220
221#[derive(Default, Debug)]
226pub struct StreamAutoClaimOptions {
227 count: Option<usize>,
228 justid: bool,
229}
230
231impl StreamAutoClaimOptions {
232 pub fn count(mut self, n: usize) -> Self {
234 self.count = Some(n);
235 self
236 }
237
238 pub fn with_justid(mut self) -> Self {
241 self.justid = true;
242 self
243 }
244}
245
246impl ToRedisArgs for StreamAutoClaimOptions {
247 fn write_redis_args<W>(&self, out: &mut W)
248 where
249 W: ?Sized + RedisWrite,
250 {
251 if let Some(ref count) = self.count {
252 out.write_arg(b"COUNT");
253 out.write_arg(format!("{count}").as_bytes());
254 }
255 if self.justid {
256 out.write_arg(b"JUSTID");
257 }
258 }
259}
260
261#[derive(Default, Debug)]
266pub struct StreamClaimOptions {
267 idle: Option<usize>,
269 time: Option<usize>,
271 retry: Option<usize>,
273 force: bool,
275 justid: bool,
278 lastid: Option<String>,
280}
281
282impl StreamClaimOptions {
283 pub fn idle(mut self, ms: usize) -> Self {
285 self.idle = Some(ms);
286 self
287 }
288
289 pub fn time(mut self, ms_time: usize) -> Self {
291 self.time = Some(ms_time);
292 self
293 }
294
295 pub fn retry(mut self, count: usize) -> Self {
297 self.retry = Some(count);
298 self
299 }
300
301 pub fn with_force(mut self) -> Self {
303 self.force = true;
304 self
305 }
306
307 pub fn with_justid(mut self) -> Self {
310 self.justid = true;
311 self
312 }
313
314 pub fn with_lastid(mut self, lastid: impl Into<String>) -> Self {
316 self.lastid = Some(lastid.into());
317 self
318 }
319}
320
321impl ToRedisArgs for StreamClaimOptions {
322 fn write_redis_args<W>(&self, out: &mut W)
323 where
324 W: ?Sized + RedisWrite,
325 {
326 if let Some(ref ms) = self.idle {
327 out.write_arg(b"IDLE");
328 out.write_arg(format!("{ms}").as_bytes());
329 }
330 if let Some(ref ms_time) = self.time {
331 out.write_arg(b"TIME");
332 out.write_arg(format!("{ms_time}").as_bytes());
333 }
334 if let Some(ref count) = self.retry {
335 out.write_arg(b"RETRYCOUNT");
336 out.write_arg(format!("{count}").as_bytes());
337 }
338 if self.force {
339 out.write_arg(b"FORCE");
340 }
341 if self.justid {
342 out.write_arg(b"JUSTID");
343 }
344 if let Some(ref lastid) = self.lastid {
345 out.write_arg(b"LASTID");
346 lastid.write_redis_args(out);
347 }
348 }
349}
350
351type SRGroup = Option<(Vec<Vec<u8>>, Vec<Vec<u8>>)>;
355#[derive(Default, Debug)]
360pub struct StreamReadOptions {
361 block: Option<usize>,
363 count: Option<usize>,
365 noack: Option<bool>,
367 group: SRGroup,
370 claim: Option<usize>,
373}
374
375impl StreamReadOptions {
376 pub fn read_only(&self) -> bool {
379 self.group.is_none()
380 }
381
382 pub fn noack(mut self) -> Self {
386 self.noack = Some(true);
387 self
388 }
389
390 pub fn block(mut self, ms: usize) -> Self {
392 self.block = Some(ms);
393 self
394 }
395
396 pub fn count(mut self, n: usize) -> Self {
398 self.count = Some(n);
399 self
400 }
401
402 pub fn group<GN: ToRedisArgs, CN: ToRedisArgs>(
404 mut self,
405 group_name: GN,
406 consumer_name: CN,
407 ) -> Self {
408 self.group = Some((
409 ToRedisArgs::to_redis_args(&group_name),
410 ToRedisArgs::to_redis_args(&consumer_name),
411 ));
412 self
413 }
414
415 pub fn claim(mut self, min_idle_time: usize) -> Self {
417 self.claim = Some(min_idle_time);
418 self
419 }
420}
421
422impl ToRedisArgs for StreamReadOptions {
423 fn write_redis_args<W>(&self, out: &mut W)
424 where
425 W: ?Sized + RedisWrite,
426 {
427 if let Some(ref group) = self.group {
428 out.write_arg(b"GROUP");
429 for i in &group.0 {
430 out.write_arg(i);
431 }
432 for i in &group.1 {
433 out.write_arg(i);
434 }
435 }
436
437 if let Some(ref ms) = self.block {
438 out.write_arg(b"BLOCK");
439 out.write_arg(format!("{ms}").as_bytes());
440 }
441
442 if let Some(ref n) = self.count {
443 out.write_arg(b"COUNT");
444 out.write_arg(format!("{n}").as_bytes());
445 }
446
447 if self.group.is_some() {
448 if self.noack == Some(true) {
450 out.write_arg(b"NOACK");
451 }
452 if let Some(ref min_idle_time) = self.claim {
454 out.write_arg(b"CLAIM");
455 out.write_arg(format!("{min_idle_time}").as_bytes());
456 }
457 }
458 }
459}
460
461#[derive(Default, Debug, Clone)]
466pub struct StreamAutoClaimReply {
467 pub next_stream_id: String,
469 pub claimed: Vec<StreamId>,
471 pub deleted_ids: Vec<String>,
473 pub invalid_entries: bool,
477}
478
479#[derive(Default, Debug, Clone)]
485pub struct StreamReadReply {
486 pub keys: Vec<StreamKey>,
488}
489
490#[derive(Default, Debug, Clone)]
502pub struct StreamRangeReply {
503 pub ids: Vec<StreamId>,
505}
506
507#[derive(Default, Debug, Clone)]
514pub struct StreamClaimReply {
515 pub ids: Vec<StreamId>,
517}
518
519#[derive(Debug, Clone, Default)]
527#[non_exhaustive]
528pub enum StreamPendingReply {
529 #[default]
531 Empty,
532 Data(StreamPendingData),
534}
535
536impl StreamPendingReply {
537 pub fn count(&self) -> usize {
539 match self {
540 StreamPendingReply::Empty => 0,
541 StreamPendingReply::Data(x) => x.count,
542 }
543 }
544}
545
546#[derive(Default, Debug, Clone)]
550pub struct StreamPendingData {
551 pub count: usize,
553 pub start_id: String,
555 pub end_id: String,
557 pub consumers: Vec<StreamInfoConsumer>,
561}
562
563#[derive(Default, Debug, Clone)]
573pub struct StreamPendingCountReply {
574 pub ids: Vec<StreamPendingId>,
578}
579
580#[derive(Default, Debug, Clone)]
589pub struct StreamInfoStreamReply {
590 pub last_generated_id: String,
593 pub radix_tree_keys: usize,
596 pub groups: usize,
598 pub length: usize,
600 pub first_entry: StreamId,
602 pub last_entry: StreamId,
604}
605
606#[derive(Default, Debug, Clone)]
612pub struct StreamInfoConsumersReply {
613 pub consumers: Vec<StreamInfoConsumer>,
615}
616
617#[derive(Default, Debug, Clone)]
625pub struct StreamInfoGroupsReply {
626 pub groups: Vec<StreamInfoGroup>,
628}
629
630#[derive(Default, Debug, Clone)]
635pub struct StreamInfoConsumer {
636 pub name: String,
638 pub pending: usize,
640 pub idle: usize,
642}
643
644#[derive(Default, Debug, Clone)]
649pub struct StreamInfoGroup {
650 pub name: String,
652 pub consumers: usize,
654 pub pending: usize,
656 pub last_delivered_id: String,
658 pub entries_read: Option<usize>,
661 pub lag: Option<usize>,
664}
665
666#[derive(Default, Debug, Clone)]
670pub struct StreamPendingId {
671 pub id: String,
673 pub consumer: String,
677 pub last_delivered_ms: usize,
680 pub times_delivered: usize,
682}
683
684#[derive(Default, Debug, Clone)]
686pub struct StreamKey {
687 pub key: String,
689 pub ids: Vec<StreamId>,
691}
692
693#[derive(Default, Debug, Clone, PartialEq)]
696pub struct StreamId {
697 pub id: String,
699 pub map: HashMap<String, Value>,
701 pub milliseconds_elapsed_from_delivery: Option<usize>,
703 pub delivered_count: Option<usize>,
705}
706
707impl StreamId {
708 fn from_array_value(v: Value) -> Result<Self, ParsingError> {
710 let mut stream_id = StreamId::default();
711 if let Value::Array(mut values) = v {
712 if let Some(v) = values.first_mut() {
713 stream_id.id = from_redis_value(std::mem::take(v))?;
714 }
715 if let Some(v) = values.first_mut() {
716 stream_id.map = from_redis_value(std::mem::take(v))?;
717 }
718 }
719
720 Ok(stream_id)
721 }
722
723 pub fn get<T: FromRedisValue>(&self, key: &str) -> Option<T> {
726 match self.map.get(key) {
727 Some(x) => from_redis_value_ref(x).ok(),
728 None => None,
729 }
730 }
731
732 pub fn contains_key(&self, key: &str) -> bool {
734 self.map.contains_key(key)
735 }
736
737 pub fn len(&self) -> usize {
739 self.map.len()
740 }
741
742 pub fn is_empty(&self) -> bool {
744 self.len() == 0
745 }
746}
747
748type SACRows = Vec<HashMap<String, HashMap<String, Value>>>;
749
750impl FromRedisValue for StreamAutoClaimReply {
751 fn from_redis_value(v: Value) -> Result<Self, ParsingError> {
752 let Value::Array(mut items) = v else {
753 invalid_type_error!("Not a array response", v);
754 };
755
756 if items.len() > 3 || items.len() < 2 {
757 invalid_type_error!("Incorrect number of items", &items);
758 }
759
760 let deleted_ids = if items.len() == 3 {
761 from_redis_value(items.pop().unwrap())?
762 } else {
763 Vec::new()
764 };
765 let claimed = items.pop().unwrap();
767 let next_stream_id = from_redis_value(items.pop().unwrap())?;
768
769 let Value::Array(arr) = &claimed else {
770 invalid_type_error!("Incorrect type", claimed)
771 };
772 let Some(entry) = arr.iter().find(|val| !matches!(val, Value::Nil)) else {
773 return Ok(Self {
774 next_stream_id,
775 claimed: Vec::new(),
776 deleted_ids,
777 invalid_entries: !arr.is_empty(),
778 });
779 };
780 let (claimed, invalid_entries) = match entry {
781 Value::BulkString(_) => {
782 let claimed_count = arr.len();
784 let ids: Vec<Option<String>> = from_redis_value(claimed)?;
785
786 let claimed: Vec<_> = ids
787 .into_iter()
788 .filter_map(|id| {
789 id.map(|id| StreamId {
790 id,
791 ..Default::default()
792 })
793 })
794 .collect();
795 let invalid_entries = claimed.len() < claimed_count;
797 (claimed, invalid_entries)
798 }
799 Value::Array(_) => {
800 let claimed_count = arr.len();
802 let rows: SACRows = from_redis_value(claimed)?;
803
804 let claimed: Vec<_> = rows
805 .into_iter()
806 .flat_map(|row| {
807 row.into_iter().map(|(id, map)| StreamId {
808 id,
809 map,
810 milliseconds_elapsed_from_delivery: None,
811 delivered_count: None,
812 })
813 })
814 .collect();
815 let invalid_entries = claimed.len() < claimed_count;
817 (claimed, invalid_entries)
818 }
819 _ => invalid_type_error!("Incorrect type", claimed),
820 };
821
822 Ok(Self {
823 next_stream_id,
824 claimed,
825 deleted_ids,
826 invalid_entries,
827 })
828 }
829}
830
831type SRRows = Vec<HashMap<String, Vec<HashMap<String, HashMap<String, Value>>>>>;
832type SRClaimRows =
833 Vec<HashMap<String, Vec<(String, HashMap<String, Value>, Option<usize>, Option<usize>)>>>;
834
835impl FromRedisValue for StreamReadReply {
836 fn from_redis_value(v: Value) -> Result<Self, ParsingError> {
837 if let Ok(rows) = from_redis_value::<SRRows>(v.clone()) {
839 return Ok(Self::from_standard_rows(rows));
840 }
841
842 if let Ok(rows) = from_redis_value::<SRClaimRows>(v.clone()) {
845 return Ok(Self::from_claim_rows(rows));
846 }
847
848 invalid_type_error!("Could not parse StreamReadReply in any known format", v)
849 }
850}
851
852impl StreamReadReply {
853 fn from_standard_rows(rows: SRRows) -> Self {
854 let keys = rows
855 .into_iter()
856 .flat_map(|row| {
857 row.into_iter().map(|(key, entries)| StreamKey {
858 key,
859 ids: entries
860 .into_iter()
861 .flat_map(|id_row| {
862 id_row.into_iter().map(|(id, map)| StreamId {
863 id,
864 map,
865 milliseconds_elapsed_from_delivery: None,
866 delivered_count: None,
867 })
868 })
869 .collect(),
870 })
871 })
872 .collect();
873 StreamReadReply { keys }
874 }
875
876 fn from_claim_rows(rows: SRClaimRows) -> Self {
877 let keys = rows
878 .into_iter()
879 .flat_map(|row| {
880 row.into_iter().map(|(key, entries)| StreamKey {
881 key,
882 ids: entries
883 .into_iter()
884 .map(
885 |(id, map, milliseconds_elapsed_from_delivery, delivered_count)| {
886 StreamId {
887 id,
888 map,
889 milliseconds_elapsed_from_delivery,
890 delivered_count,
891 }
892 },
893 )
894 .collect(),
895 })
896 })
897 .collect();
898 StreamReadReply { keys }
899 }
900}
901
902impl FromRedisValue for StreamRangeReply {
903 fn from_redis_value(v: Value) -> Result<Self, ParsingError> {
904 let rows: Vec<HashMap<String, HashMap<String, Value>>> = from_redis_value(v)?;
905 let ids: Vec<StreamId> = rows
906 .into_iter()
907 .flat_map(|row| {
908 row.into_iter().map(|(id, map)| StreamId {
909 id,
910 map,
911 milliseconds_elapsed_from_delivery: None,
912 delivered_count: None,
913 })
914 })
915 .collect();
916 Ok(StreamRangeReply { ids })
917 }
918}
919
920impl FromRedisValue for StreamClaimReply {
921 fn from_redis_value(v: Value) -> Result<Self, ParsingError> {
922 let rows: Vec<HashMap<String, HashMap<String, Value>>> = from_redis_value(v)?;
923 let ids: Vec<StreamId> = rows
924 .into_iter()
925 .flat_map(|row| {
926 row.into_iter().map(|(id, map)| StreamId {
927 id,
928 map,
929 milliseconds_elapsed_from_delivery: None,
930 delivered_count: None,
931 })
932 })
933 .collect();
934 Ok(StreamClaimReply { ids })
935 }
936}
937
938type SPRInner = (
939 usize,
940 Option<String>,
941 Option<String>,
942 Vec<Option<(String, String)>>,
943);
944impl FromRedisValue for StreamPendingReply {
945 fn from_redis_value(v: Value) -> Result<Self, ParsingError> {
946 let (count, start, end, consumer_data): SPRInner = from_redis_value(v)?;
947
948 if count == 0 {
949 Ok(StreamPendingReply::Empty)
950 } else {
951 let mut result = StreamPendingData::default();
952
953 let start_id = start.ok_or_else(|| {
954 ParsingError::from(arcstr::literal!(
955 "IllegalState: Non-zero pending expects start id"
956 ))
957 })?;
958
959 let end_id = end.ok_or_else(|| {
960 ParsingError::from(arcstr::literal!(
961 "IllegalState: Non-zero pending expects end id"
962 ))
963 })?;
964
965 result.count = count;
966 result.start_id = start_id;
967 result.end_id = end_id;
968
969 result.consumers = consumer_data
970 .into_iter()
971 .flatten()
972 .map(|(name, pending)| StreamInfoConsumer {
973 name,
974 pending: pending.parse().unwrap_or_default(),
975 ..Default::default()
976 })
977 .collect();
978
979 Ok(StreamPendingReply::Data(result))
980 }
981 }
982}
983
984impl FromRedisValue for StreamPendingCountReply {
985 fn from_redis_value(v: Value) -> Result<Self, ParsingError> {
986 let mut reply = StreamPendingCountReply::default();
987 match v {
988 Value::Array(outer_tuple) => {
989 for outer in outer_tuple {
990 match outer {
991 Value::Array(inner_tuple) => match &inner_tuple[..] {
992 [
993 Value::BulkString(id_bytes),
994 Value::BulkString(consumer_bytes),
995 Value::Int(last_delivered_ms_u64),
996 Value::Int(times_delivered_u64),
997 ] => {
998 let id = String::from_utf8(id_bytes.to_vec())?;
999 let consumer = String::from_utf8(consumer_bytes.to_vec())?;
1000 let last_delivered_ms = *last_delivered_ms_u64 as usize;
1001 let times_delivered = *times_delivered_u64 as usize;
1002 reply.ids.push(StreamPendingId {
1003 id,
1004 consumer,
1005 last_delivered_ms,
1006 times_delivered,
1007 });
1008 }
1009 _ => fail!(ParsingError::from(arcstr::literal!(
1010 "Cannot parse redis data (3)"
1011 ))),
1012 },
1013 _ => fail!(ParsingError::from(arcstr::literal!(
1014 "Cannot parse redis data (2)"
1015 ))),
1016 }
1017 }
1018 }
1019 _ => fail!(ParsingError::from(arcstr::literal!(
1020 "Cannot parse redis data (1)"
1021 ))),
1022 };
1023 Ok(reply)
1024 }
1025}
1026
1027impl FromRedisValue for StreamInfoStreamReply {
1028 fn from_redis_value(v: Value) -> Result<Self, ParsingError> {
1029 let mut map: HashMap<String, Value> = from_redis_value(v)?;
1030 let mut reply = StreamInfoStreamReply::default();
1031 if let Some(v) = map.remove("last-generated-id") {
1032 reply.last_generated_id = from_redis_value(v)?;
1033 }
1034 if let Some(v) = map.remove("radix-tree-nodes") {
1035 reply.radix_tree_keys = from_redis_value(v)?;
1036 }
1037 if let Some(v) = map.remove("groups") {
1038 reply.groups = from_redis_value(v)?;
1039 }
1040 if let Some(v) = map.remove("length") {
1041 reply.length = from_redis_value(v)?;
1042 }
1043 if let Some(v) = map.remove("first-entry") {
1044 reply.first_entry = StreamId::from_array_value(v)?;
1045 }
1046 if let Some(v) = map.remove("last-entry") {
1047 reply.last_entry = StreamId::from_array_value(v)?;
1048 }
1049 Ok(reply)
1050 }
1051}
1052
1053impl FromRedisValue for StreamInfoConsumersReply {
1054 fn from_redis_value(v: Value) -> Result<Self, ParsingError> {
1055 let consumers: Vec<HashMap<String, Value>> = from_redis_value(v)?;
1056 let mut reply = StreamInfoConsumersReply::default();
1057 for mut map in consumers {
1058 let mut c = StreamInfoConsumer::default();
1059 if let Some(v) = map.remove("name") {
1060 c.name = from_redis_value(v)?;
1061 }
1062 if let Some(v) = map.remove("pending") {
1063 c.pending = from_redis_value(v)?;
1064 }
1065 if let Some(v) = map.remove("idle") {
1066 c.idle = from_redis_value(v)?;
1067 }
1068 reply.consumers.push(c);
1069 }
1070
1071 Ok(reply)
1072 }
1073}
1074
1075impl FromRedisValue for StreamInfoGroupsReply {
1076 fn from_redis_value(v: Value) -> Result<Self, ParsingError> {
1077 let groups: Vec<HashMap<String, Value>> = from_redis_value(v)?;
1078 let mut reply = StreamInfoGroupsReply::default();
1079 for mut map in groups {
1080 let mut g = StreamInfoGroup::default();
1081 if let Some(v) = map.remove("name") {
1082 g.name = from_redis_value(v)?;
1083 }
1084 if let Some(v) = map.remove("pending") {
1085 g.pending = from_redis_value(v)?;
1086 }
1087 if let Some(v) = map.remove("consumers") {
1088 g.consumers = from_redis_value(v)?;
1089 }
1090 if let Some(v) = map.remove("last-delivered-id") {
1091 g.last_delivered_id = from_redis_value(v)?;
1092 }
1093 if let Some(v) = map.remove("entries-read") {
1094 g.entries_read = if let Value::Nil = v {
1095 None
1096 } else {
1097 Some(from_redis_value(v)?)
1098 };
1099 }
1100 if let Some(v) = map.remove("lag") {
1101 g.lag = if let Value::Nil = v {
1102 None
1103 } else {
1104 Some(from_redis_value(v)?)
1105 };
1106 }
1107 reply.groups.push(g);
1108 }
1109 Ok(reply)
1110 }
1111}
1112
1113#[derive(Debug, Clone, Default)]
1115#[non_exhaustive]
1116pub enum StreamDeletionPolicy {
1117 #[default]
1119 KeepRef,
1120 DelRef,
1122 Acked,
1124}
1125
1126impl ToRedisArgs for StreamDeletionPolicy {
1127 fn write_redis_args<W>(&self, out: &mut W)
1128 where
1129 W: ?Sized + RedisWrite,
1130 {
1131 match self {
1132 StreamDeletionPolicy::KeepRef => out.write_arg(b"KEEPREF"),
1133 StreamDeletionPolicy::DelRef => out.write_arg(b"DELREF"),
1134 StreamDeletionPolicy::Acked => out.write_arg(b"ACKED"),
1135 }
1136 }
1137}
1138impl ToSingleRedisArg for StreamDeletionPolicy {}
1139
1140#[cfg(feature = "streams")]
1142#[cfg_attr(docsrs, doc(cfg(feature = "streams")))]
1143#[derive(Debug, PartialEq, Eq)]
1144#[non_exhaustive]
1145pub enum XDelExStatusCode {
1146 IdNotFound = -1,
1148 Deleted = 1,
1150 NotDeletedUnacknowledgedOrStillReferenced = 2,
1153}
1154
1155#[cfg(feature = "streams")]
1156impl FromRedisValue for XDelExStatusCode {
1157 fn from_redis_value(v: Value) -> Result<Self, ParsingError> {
1158 match v {
1159 Value::Int(code) => match code {
1160 -1 => Ok(XDelExStatusCode::IdNotFound),
1161 1 => Ok(XDelExStatusCode::Deleted),
1162 2 => Ok(XDelExStatusCode::NotDeletedUnacknowledgedOrStillReferenced),
1163 _ => Err(format!("Invalid XDelExStatusCode status code: {code}").into()),
1164 },
1165 _ => Err(arcstr::literal!("Response type not XAckDelStatusCode compatible").into()),
1166 }
1167 }
1168}
1169
1170#[cfg(feature = "streams")]
1172#[cfg_attr(docsrs, doc(cfg(feature = "streams")))]
1173#[derive(Debug, PartialEq, Eq)]
1174#[non_exhaustive]
1175pub enum XAckDelStatusCode {
1176 IdNotFound = -1,
1178 AcknowledgedAndDeleted = 1,
1180 AcknowledgedNotDeletedStillReferenced = 2,
1182}
1183
1184#[cfg(feature = "streams")]
1185impl FromRedisValue for XAckDelStatusCode {
1186 fn from_redis_value(v: Value) -> Result<Self, ParsingError> {
1187 match v {
1188 Value::Int(code) => match code {
1189 -1 => Ok(XAckDelStatusCode::IdNotFound),
1190 1 => Ok(XAckDelStatusCode::AcknowledgedAndDeleted),
1191 2 => Ok(XAckDelStatusCode::AcknowledgedNotDeletedStillReferenced),
1192 _ => Err(arcstr::literal!("Invalid XAckDelStatusCode status code: {code}").into()),
1193 },
1194 _ => Err(arcstr::literal!("Response type not XAckDelStatusCode compatible").into()),
1195 }
1196 }
1197}
1198
1199#[cfg(test)]
1200mod tests {
1201 use super::*;
1202
1203 fn assert_command_eq(object: impl ToRedisArgs, expected: &[u8]) {
1204 let mut out: Vec<Vec<u8>> = Vec::new();
1205
1206 object.write_redis_args(&mut out);
1207
1208 let mut cmd: Vec<u8> = Vec::new();
1209
1210 out.iter_mut().for_each(|item| {
1211 cmd.append(item);
1212 cmd.push(b' ');
1213 });
1214
1215 cmd.pop();
1216
1217 assert_eq!(cmd, expected);
1218 }
1219
1220 mod stream_auto_claim_reply {
1221 use super::*;
1222 use crate::Value;
1223
1224 #[test]
1225 fn short_response() {
1226 let value = Value::Array(vec![Value::BulkString("1713465536578-0".into())]);
1227
1228 StreamAutoClaimReply::from_redis_value(value).unwrap_err();
1229 }
1230
1231 #[test]
1232 fn parses_none_claimed_response() {
1233 let value = Value::Array(vec![
1234 Value::BulkString("0-0".into()),
1235 Value::Array(vec![]),
1236 Value::Array(vec![]),
1237 ]);
1238
1239 let reply: StreamAutoClaimReply = FromRedisValue::from_redis_value(value).unwrap();
1240
1241 assert_eq!(reply.next_stream_id.as_str(), "0-0");
1242 assert_eq!(reply.claimed.len(), 0);
1243 assert_eq!(reply.deleted_ids.len(), 0);
1244 }
1245
1246 #[test]
1247 fn parses_response() {
1248 let value = Value::Array(vec![
1249 Value::BulkString("1713465536578-0".into()),
1250 Value::Array(vec![
1251 Value::Array(vec![
1252 Value::BulkString("1713465533411-0".into()),
1253 Value::Array(vec![
1255 Value::BulkString("name".into()),
1256 Value::BulkString("test".into()),
1257 Value::BulkString("other".into()),
1258 Value::BulkString("whaterver".into()),
1259 ]),
1260 ]),
1261 Value::Array(vec![
1262 Value::BulkString("1713465536069-0".into()),
1263 Value::Array(vec![
1264 Value::BulkString("name".into()),
1265 Value::BulkString("another test".into()),
1266 Value::BulkString("other".into()),
1267 Value::BulkString("something".into()),
1268 ]),
1269 ]),
1270 ]),
1271 Value::Array(vec![Value::BulkString("123456789-0".into())]),
1272 ]);
1273
1274 let reply: StreamAutoClaimReply = FromRedisValue::from_redis_value(value).unwrap();
1275
1276 assert_eq!(reply.next_stream_id.as_str(), "1713465536578-0");
1277 assert_eq!(reply.claimed.len(), 2);
1278 assert_eq!(reply.claimed[0].id.as_str(), "1713465533411-0");
1279 assert!(
1280 matches!(reply.claimed[0].map.get("name"), Some(Value::BulkString(v)) if v == "test".as_bytes())
1281 );
1282 assert_eq!(reply.claimed[1].id.as_str(), "1713465536069-0");
1283 assert_eq!(reply.deleted_ids.len(), 1);
1284 assert!(reply.deleted_ids.contains(&"123456789-0".to_string()))
1285 }
1286
1287 #[test]
1288 fn parses_v6_response() {
1289 let value = Value::Array(vec![
1290 Value::BulkString("1713465536578-0".into()),
1291 Value::Array(vec![
1292 Value::Array(vec![
1293 Value::BulkString("1713465533411-0".into()),
1294 Value::Array(vec![
1295 Value::BulkString("name".into()),
1296 Value::BulkString("test".into()),
1297 Value::BulkString("other".into()),
1298 Value::BulkString("whaterver".into()),
1299 ]),
1300 ]),
1301 Value::Array(vec![
1302 Value::BulkString("1713465536069-0".into()),
1303 Value::Array(vec![
1304 Value::BulkString("name".into()),
1305 Value::BulkString("another test".into()),
1306 Value::BulkString("other".into()),
1307 Value::BulkString("something".into()),
1308 ]),
1309 ]),
1310 ]),
1311 ]);
1313
1314 let reply: StreamAutoClaimReply = FromRedisValue::from_redis_value(value).unwrap();
1315
1316 assert_eq!(reply.next_stream_id.as_str(), "1713465536578-0");
1317 assert_eq!(reply.claimed.len(), 2);
1318 let ids: Vec<_> = reply.claimed.iter().map(|e| e.id.as_str()).collect();
1319 assert!(ids.contains(&"1713465533411-0"));
1320 assert!(ids.contains(&"1713465536069-0"));
1321 assert_eq!(reply.deleted_ids.len(), 0);
1322 }
1323
1324 #[test]
1325 fn parses_justid_response() {
1326 let value = Value::Array(vec![
1327 Value::BulkString("1713465536578-0".into()),
1328 Value::Array(vec![
1329 Value::BulkString("1713465533411-0".into()),
1330 Value::BulkString("1713465536069-0".into()),
1331 ]),
1332 Value::Array(vec![Value::BulkString("123456789-0".into())]),
1333 ]);
1334
1335 let reply: StreamAutoClaimReply = FromRedisValue::from_redis_value(value).unwrap();
1336
1337 assert_eq!(reply.next_stream_id.as_str(), "1713465536578-0");
1338 assert_eq!(reply.claimed.len(), 2);
1339 let ids: Vec<_> = reply.claimed.iter().map(|e| e.id.as_str()).collect();
1340 assert!(ids.contains(&"1713465533411-0"));
1341 assert!(ids.contains(&"1713465536069-0"));
1342 assert_eq!(reply.deleted_ids.len(), 1);
1343 assert!(reply.deleted_ids.contains(&"123456789-0".to_string()))
1344 }
1345
1346 #[test]
1347 fn parses_v6_justid_response() {
1348 let value = Value::Array(vec![
1349 Value::BulkString("1713465536578-0".into()),
1350 Value::Array(vec![
1351 Value::BulkString("1713465533411-0".into()),
1352 Value::BulkString("1713465536069-0".into()),
1353 ]),
1354 ]);
1356
1357 let reply: StreamAutoClaimReply = FromRedisValue::from_redis_value(value).unwrap();
1358
1359 assert_eq!(reply.next_stream_id.as_str(), "1713465536578-0");
1360 assert_eq!(reply.claimed.len(), 2);
1361 let ids: Vec<_> = reply.claimed.iter().map(|e| e.id.as_str()).collect();
1362 assert!(ids.contains(&"1713465533411-0"));
1363 assert!(ids.contains(&"1713465536069-0"));
1364 assert_eq!(reply.deleted_ids.len(), 0);
1365 }
1366 }
1367
1368 mod stream_trim_options {
1369 use super::*;
1370
1371 #[test]
1372 fn maxlen_trim() {
1373 let options = StreamTrimOptions::maxlen(StreamTrimmingMode::Approx, 10);
1374
1375 assert_command_eq(options, b"MAXLEN ~ 10");
1376 }
1377
1378 #[test]
1379 fn maxlen_exact_trim() {
1380 let options = StreamTrimOptions::maxlen(StreamTrimmingMode::Exact, 10);
1381
1382 assert_command_eq(options, b"MAXLEN = 10");
1383 }
1384
1385 #[test]
1386 fn maxlen_trim_limit() {
1387 let options = StreamTrimOptions::maxlen(StreamTrimmingMode::Approx, 10).limit(5);
1388
1389 assert_command_eq(options, b"MAXLEN ~ 10 LIMIT 5");
1390 }
1391 #[test]
1392 fn minid_trim_limit() {
1393 let options = StreamTrimOptions::minid(StreamTrimmingMode::Exact, "123456-7").limit(5);
1394
1395 assert_command_eq(options, b"MINID = 123456-7 LIMIT 5");
1396 }
1397 }
1398
1399 mod stream_add_options {
1400 use super::*;
1401
1402 #[test]
1403 fn the_default() {
1404 let options = StreamAddOptions::default();
1405
1406 assert_command_eq(options, b"");
1407 }
1408
1409 #[test]
1410 fn with_maxlen_trim() {
1411 let options = StreamAddOptions::default()
1412 .trim(StreamTrimStrategy::maxlen(StreamTrimmingMode::Exact, 10));
1413
1414 assert_command_eq(options, b"MAXLEN = 10");
1415 }
1416
1417 #[test]
1418 fn with_nomkstream() {
1419 let options = StreamAddOptions::default().nomkstream();
1420
1421 assert_command_eq(options, b"NOMKSTREAM");
1422 }
1423
1424 #[test]
1425 fn with_nomkstream_and_maxlen_trim() {
1426 let options = StreamAddOptions::default()
1427 .nomkstream()
1428 .trim(StreamTrimStrategy::maxlen(StreamTrimmingMode::Exact, 10));
1429
1430 assert_command_eq(options, b"NOMKSTREAM MAXLEN = 10");
1431 }
1432 }
1433}