redis/
parser.rs

1use std::{
2    io::{self, Read},
3    str,
4};
5
6use crate::types::{
7    ErrorKind, PushKind, RedisError, RedisResult, ServerError, ServerErrorKind, Value,
8    VerbatimFormat,
9};
10
11use combine::{
12    any,
13    error::StreamError,
14    opaque,
15    parser::{
16        byte::{crlf, take_until_bytes},
17        combinator::{any_send_sync_partial_state, AnySendSyncPartialState},
18        range::{recognize, take},
19    },
20    stream::{
21        decoder::{self, Decoder},
22        PointerOffset, RangeStream, StreamErrorFor,
23    },
24    unexpected_any, ParseError, Parser as _,
25};
26use num_bigint::BigInt;
27
28const MAX_RECURSE_DEPTH: usize = 100;
29
30fn err_parser(line: &str) -> ServerError {
31    let mut pieces = line.splitn(2, ' ');
32    let kind = match pieces.next().unwrap() {
33        "ERR" => ServerErrorKind::ResponseError,
34        "EXECABORT" => ServerErrorKind::ExecAbortError,
35        "LOADING" => ServerErrorKind::BusyLoadingError,
36        "NOSCRIPT" => ServerErrorKind::NoScriptError,
37        "MOVED" => ServerErrorKind::Moved,
38        "ASK" => ServerErrorKind::Ask,
39        "TRYAGAIN" => ServerErrorKind::TryAgain,
40        "CLUSTERDOWN" => ServerErrorKind::ClusterDown,
41        "CROSSSLOT" => ServerErrorKind::CrossSlot,
42        "MASTERDOWN" => ServerErrorKind::MasterDown,
43        "READONLY" => ServerErrorKind::ReadOnly,
44        "NOTBUSY" => ServerErrorKind::NotBusy,
45        "NOSUB" => ServerErrorKind::NoSub,
46        code => {
47            return ServerError::ExtensionError {
48                code: code.to_string(),
49                detail: pieces.next().map(|str| str.to_string()),
50            }
51        }
52    };
53    let detail = pieces.next().map(|str| str.to_string());
54    ServerError::KnownError { kind, detail }
55}
56
57pub fn get_push_kind(kind: String) -> PushKind {
58    match kind.as_str() {
59        "invalidate" => PushKind::Invalidate,
60        "message" => PushKind::Message,
61        "pmessage" => PushKind::PMessage,
62        "smessage" => PushKind::SMessage,
63        "unsubscribe" => PushKind::Unsubscribe,
64        "punsubscribe" => PushKind::PUnsubscribe,
65        "sunsubscribe" => PushKind::SUnsubscribe,
66        "subscribe" => PushKind::Subscribe,
67        "psubscribe" => PushKind::PSubscribe,
68        "ssubscribe" => PushKind::SSubscribe,
69        _ => PushKind::Other(kind),
70    }
71}
72
73fn value<'a, I>(
74    count: Option<usize>,
75) -> impl combine::Parser<I, Output = Value, PartialState = AnySendSyncPartialState>
76where
77    I: RangeStream<Token = u8, Range = &'a [u8]>,
78    I::Error: combine::ParseError<u8, &'a [u8], I::Position>,
79{
80    let count = count.unwrap_or(1);
81
82    opaque!(any_send_sync_partial_state(
83        any()
84            .then_partial(move |&mut b| {
85                if count > MAX_RECURSE_DEPTH {
86                    combine::unexpected_any("Maximum recursion depth exceeded").left()
87                } else {
88                    combine::value(b).right()
89                }
90            })
91            .then_partial(move |&mut b| {
92                let line = || {
93                    recognize(take_until_bytes(&b"\r\n"[..]).with(take(2).map(|_| ()))).and_then(
94                        |line: &[u8]| {
95                            str::from_utf8(&line[..line.len() - 2])
96                                .map_err(StreamErrorFor::<I>::other)
97                        },
98                    )
99                };
100
101                let simple_string = || {
102                    line().map(|line| {
103                        if line == "OK" {
104                            Value::Okay
105                        } else {
106                            Value::SimpleString(line.into())
107                        }
108                    })
109                };
110
111                let int = || {
112                    line().and_then(|line| {
113                        line.trim().parse::<i64>().map_err(|_| {
114                            StreamErrorFor::<I>::message_static_message(
115                                "Expected integer, got garbage",
116                            )
117                        })
118                    })
119                };
120
121                let bulk_string = || {
122                    int().then_partial(move |size| {
123                        if *size < 0 {
124                            combine::produce(|| Value::Nil).left()
125                        } else {
126                            take(*size as usize)
127                                .map(|bs: &[u8]| Value::BulkString(bs.to_vec()))
128                                .skip(crlf())
129                                .right()
130                        }
131                    })
132                };
133                let blob = || {
134                    int().then_partial(move |size| {
135                        take(*size as usize)
136                            .map(|bs: &[u8]| String::from_utf8_lossy(bs).to_string())
137                            .skip(crlf())
138                    })
139                };
140
141                let array = || {
142                    int().then_partial(move |&mut length| {
143                        if length < 0 {
144                            combine::produce(|| Value::Nil).left()
145                        } else {
146                            let length = length as usize;
147                            combine::count_min_max(length, length, value(Some(count + 1)))
148                                .map(Value::Array)
149                                .right()
150                        }
151                    })
152                };
153
154                let error = || line().map(err_parser);
155                let map = || {
156                    int().then_partial(move |&mut kv_length| {
157                        match (kv_length as usize).checked_mul(2) {
158                            Some(length) => {
159                                combine::count_min_max(length, length, value(Some(count + 1)))
160                                    .map(move |result: Vec<Value>| {
161                                        let mut it = result.into_iter();
162                                        let mut x = vec![];
163                                        for _ in 0..kv_length {
164                                            if let (Some(k), Some(v)) = (it.next(), it.next()) {
165                                                x.push((k, v))
166                                            }
167                                        }
168                                        Value::Map(x)
169                                    })
170                                    .left()
171                            }
172                            None => {
173                                unexpected_any("Attribute key-value length is too large").right()
174                            }
175                        }
176                    })
177                };
178                let attribute = || {
179                    int().then_partial(move |&mut kv_length| {
180                        match (kv_length as usize).checked_mul(2) {
181                            Some(length) => {
182                                // + 1 is for data!
183                                let length = length + 1;
184                                combine::count_min_max(length, length, value(Some(count + 1)))
185                                    .map(move |result: Vec<Value>| {
186                                        let mut it = result.into_iter();
187                                        let mut attributes = vec![];
188                                        for _ in 0..kv_length {
189                                            if let (Some(k), Some(v)) = (it.next(), it.next()) {
190                                                attributes.push((k, v))
191                                            }
192                                        }
193                                        Value::Attribute {
194                                            data: Box::new(it.next().unwrap()),
195                                            attributes,
196                                        }
197                                    })
198                                    .left()
199                            }
200                            None => {
201                                unexpected_any("Attribute key-value length is too large").right()
202                            }
203                        }
204                    })
205                };
206                let set = || {
207                    int().then_partial(move |&mut length| {
208                        if length < 0 {
209                            combine::produce(|| Value::Nil).left()
210                        } else {
211                            let length = length as usize;
212                            combine::count_min_max(length, length, value(Some(count + 1)))
213                                .map(Value::Set)
214                                .right()
215                        }
216                    })
217                };
218                let push = || {
219                    int().then_partial(move |&mut length| {
220                        if length <= 0 {
221                            combine::produce(|| Value::Push {
222                                kind: PushKind::Other("".to_string()),
223                                data: vec![],
224                            })
225                            .left()
226                        } else {
227                            let length = length as usize;
228                            combine::count_min_max(length, length, value(Some(count + 1)))
229                                .and_then(|result: Vec<Value>| {
230                                    let mut it = result.into_iter();
231                                    let first = it.next().unwrap_or(Value::Nil);
232                                    if let Value::BulkString(kind) = first {
233                                        let push_kind = String::from_utf8(kind)
234                                            .map_err(StreamErrorFor::<I>::other)?;
235                                        Ok(Value::Push {
236                                            kind: get_push_kind(push_kind),
237                                            data: it.collect(),
238                                        })
239                                    } else if let Value::SimpleString(kind) = first {
240                                        Ok(Value::Push {
241                                            kind: get_push_kind(kind),
242                                            data: it.collect(),
243                                        })
244                                    } else {
245                                        Err(StreamErrorFor::<I>::message_static_message(
246                                            "parse error when decoding push",
247                                        ))
248                                    }
249                                })
250                                .right()
251                        }
252                    })
253                };
254                let null = || line().map(|_| Value::Nil);
255                let double = || {
256                    line().and_then(|line| {
257                        line.trim()
258                            .parse::<f64>()
259                            .map_err(StreamErrorFor::<I>::other)
260                    })
261                };
262                let boolean = || {
263                    line().and_then(|line: &str| match line {
264                        "t" => Ok(true),
265                        "f" => Ok(false),
266                        _ => Err(StreamErrorFor::<I>::message_static_message(
267                            "Expected boolean, got garbage",
268                        )),
269                    })
270                };
271                let blob_error = || blob().map(|line| err_parser(&line));
272                let verbatim = || {
273                    blob().and_then(|line| {
274                        if let Some((format, text)) = line.split_once(':') {
275                            let format = match format {
276                                "txt" => VerbatimFormat::Text,
277                                "mkd" => VerbatimFormat::Markdown,
278                                x => VerbatimFormat::Unknown(x.to_string()),
279                            };
280                            Ok(Value::VerbatimString {
281                                format,
282                                text: text.to_string(),
283                            })
284                        } else {
285                            Err(StreamErrorFor::<I>::message_static_message(
286                                "parse error when decoding verbatim string",
287                            ))
288                        }
289                    })
290                };
291                let big_number = || {
292                    line().and_then(|line| {
293                        BigInt::parse_bytes(line.as_bytes(), 10).ok_or_else(|| {
294                            StreamErrorFor::<I>::message_static_message(
295                                "Expected bigint, got garbage",
296                            )
297                        })
298                    })
299                };
300                combine::dispatch!(b;
301                    b'+' => simple_string(),
302                    b':' => int().map(Value::Int),
303                    b'$' => bulk_string(),
304                    b'*' => array(),
305                    b'%' => map(),
306                    b'|' => attribute(),
307                    b'~' => set(),
308                    b'-' => error().map(Value::ServerError),
309                    b'_' => null(),
310                    b',' => double().map(Value::Double),
311                    b'#' => boolean().map(Value::Boolean),
312                    b'!' => blob_error().map(Value::ServerError),
313                    b'=' => verbatim(),
314                    b'(' => big_number().map(Value::BigNumber),
315                    b'>' => push(),
316                    b => combine::unexpected_any(combine::error::Token(b))
317                )
318            })
319    ))
320}
321
322// a macro is needed because of lifetime shenanigans with `decoder`.
323macro_rules! to_redis_err {
324    ($err: expr, $decoder: expr) => {
325        match $err {
326            decoder::Error::Io { error, .. } => error.into(),
327            decoder::Error::Parse(err) => {
328                if err.is_unexpected_end_of_input() {
329                    RedisError::from(io::Error::from(io::ErrorKind::UnexpectedEof))
330                } else {
331                    let err = err
332                        .map_range(|range| format!("{range:?}"))
333                        .map_position(|pos| pos.translate_position($decoder.buffer()))
334                        .to_string();
335                    RedisError::from((ErrorKind::ParseError, "parse error", err))
336                }
337            }
338        }
339    };
340}
341
342#[cfg(feature = "aio")]
343mod aio_support {
344    use super::*;
345
346    use bytes::{Buf, BytesMut};
347    use tokio::io::AsyncRead;
348    use tokio_util::codec::{Decoder, Encoder};
349
350    #[derive(Default)]
351    pub struct ValueCodec {
352        state: AnySendSyncPartialState,
353    }
354
355    impl ValueCodec {
356        fn decode_stream(&mut self, bytes: &mut BytesMut, eof: bool) -> RedisResult<Option<Value>> {
357            let (opt, removed_len) = {
358                let buffer = &bytes[..];
359                let mut stream =
360                    combine::easy::Stream(combine::stream::MaybePartialStream(buffer, !eof));
361                match combine::stream::decode_tokio(value(None), &mut stream, &mut self.state) {
362                    Ok(x) => x,
363                    Err(err) => {
364                        let err = err
365                            .map_position(|pos| pos.translate_position(buffer))
366                            .map_range(|range| format!("{range:?}"))
367                            .to_string();
368                        return Err(RedisError::from((
369                            ErrorKind::ParseError,
370                            "parse error",
371                            err,
372                        )));
373                    }
374                }
375            };
376
377            bytes.advance(removed_len);
378            match opt {
379                Some(result) => Ok(Some(result)),
380                None => Ok(None),
381            }
382        }
383    }
384
385    impl Encoder<Vec<u8>> for ValueCodec {
386        type Error = RedisError;
387        fn encode(&mut self, item: Vec<u8>, dst: &mut BytesMut) -> Result<(), Self::Error> {
388            dst.extend_from_slice(item.as_ref());
389            Ok(())
390        }
391    }
392
393    impl Decoder for ValueCodec {
394        type Item = Value;
395        type Error = RedisError;
396
397        fn decode(&mut self, bytes: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
398            self.decode_stream(bytes, false)
399        }
400
401        fn decode_eof(&mut self, bytes: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
402            self.decode_stream(bytes, true)
403        }
404    }
405
406    /// Parses a redis value asynchronously.
407    pub async fn parse_redis_value_async<R>(
408        decoder: &mut combine::stream::Decoder<AnySendSyncPartialState, PointerOffset<[u8]>>,
409        read: &mut R,
410    ) -> RedisResult<Value>
411    where
412        R: AsyncRead + std::marker::Unpin,
413    {
414        let result = combine::decode_tokio!(*decoder, *read, value(None), |input, _| {
415            combine::stream::easy::Stream::from(input)
416        });
417        match result {
418            Err(err) => Err(to_redis_err!(err, decoder)),
419            Ok(result) => Ok(result),
420        }
421    }
422}
423
424#[cfg(feature = "aio")]
425#[cfg_attr(docsrs, doc(cfg(feature = "aio")))]
426pub use self::aio_support::*;
427
428/// The internal redis response parser.
429pub struct Parser {
430    decoder: Decoder<AnySendSyncPartialState, PointerOffset<[u8]>>,
431}
432
433impl Default for Parser {
434    fn default() -> Self {
435        Parser::new()
436    }
437}
438
439/// The parser can be used to parse redis responses into values.  Generally
440/// you normally do not use this directly as it's already done for you by
441/// the client but in some more complex situations it might be useful to be
442/// able to parse the redis responses.
443impl Parser {
444    /// Creates a new parser that parses the data behind the reader.  More
445    /// than one value can be behind the reader in which case the parser can
446    /// be invoked multiple times.  In other words: the stream does not have
447    /// to be terminated.
448    pub fn new() -> Parser {
449        Parser {
450            decoder: Decoder::new(),
451        }
452    }
453
454    // public api
455
456    /// Parses synchronously into a single value from the reader.
457    pub fn parse_value<T: Read>(&mut self, mut reader: T) -> RedisResult<Value> {
458        let mut decoder = &mut self.decoder;
459        let result = combine::decode!(decoder, reader, value(None), |input, _| {
460            combine::stream::easy::Stream::from(input)
461        });
462        match result {
463            Err(err) => Err(to_redis_err!(err, decoder)),
464            Ok(result) => Ok(result),
465        }
466    }
467}
468
469/// Parses bytes into a redis value.
470///
471/// This is the most straightforward way to parse something into a low
472/// level redis value instead of having to use a whole parser.
473pub fn parse_redis_value(bytes: &[u8]) -> RedisResult<Value> {
474    let mut parser = Parser::new();
475    parser.parse_value(bytes)
476}
477
478#[cfg(test)]
479mod tests {
480    use super::*;
481
482    #[cfg(feature = "aio")]
483    #[test]
484    fn decode_eof_returns_none_at_eof() {
485        use tokio_util::codec::Decoder;
486        let mut codec = ValueCodec::default();
487
488        let mut bytes = bytes::BytesMut::from(&b"+GET 123\r\n"[..]);
489        assert_eq!(
490            codec.decode_eof(&mut bytes),
491            Ok(Some(parse_redis_value(b"+GET 123\r\n").unwrap()))
492        );
493        assert_eq!(codec.decode_eof(&mut bytes), Ok(None));
494        assert_eq!(codec.decode_eof(&mut bytes), Ok(None));
495    }
496
497    #[cfg(feature = "aio")]
498    #[test]
499    fn decode_eof_returns_error_inside_array_and_can_parse_more_inputs() {
500        use tokio_util::codec::Decoder;
501        let mut codec = ValueCodec::default();
502
503        let mut bytes =
504            bytes::BytesMut::from(b"*3\r\n+OK\r\n-LOADING server is loading\r\n+OK\r\n".as_slice());
505        let result = codec.decode_eof(&mut bytes).unwrap().unwrap();
506
507        assert_eq!(
508            result,
509            Value::Array(vec![
510                Value::Okay,
511                Value::ServerError(ServerError::KnownError {
512                    kind: ServerErrorKind::BusyLoadingError,
513                    detail: Some("server is loading".to_string())
514                }),
515                Value::Okay
516            ])
517        );
518
519        let mut bytes = bytes::BytesMut::from(b"+OK\r\n".as_slice());
520        let result = codec.decode_eof(&mut bytes).unwrap().unwrap();
521
522        assert_eq!(result, Value::Okay);
523    }
524
525    #[test]
526    fn parse_nested_error_and_handle_more_inputs() {
527        // from https://redis.io/docs/interact/transactions/ -
528        // "EXEC returned two-element bulk string reply where one is an OK code and the other an error reply. It's up to the client library to find a sensible way to provide the error to the user."
529
530        let bytes = b"*3\r\n+OK\r\n-LOADING server is loading\r\n+OK\r\n";
531        let result = parse_redis_value(bytes);
532
533        assert_eq!(
534            result.unwrap(),
535            Value::Array(vec![
536                Value::Okay,
537                Value::ServerError(ServerError::KnownError {
538                    kind: ServerErrorKind::BusyLoadingError,
539                    detail: Some("server is loading".to_string())
540                }),
541                Value::Okay
542            ])
543        );
544
545        let result = parse_redis_value(b"+OK\r\n").unwrap();
546
547        assert_eq!(result, Value::Okay);
548    }
549
550    #[test]
551    fn decode_resp3_double() {
552        let val = parse_redis_value(b",1.23\r\n").unwrap();
553        assert_eq!(val, Value::Double(1.23));
554        let val = parse_redis_value(b",nan\r\n").unwrap();
555        if let Value::Double(val) = val {
556            assert!(val.is_sign_positive());
557            assert!(val.is_nan());
558        } else {
559            panic!("expected double");
560        }
561        // -nan is supported prior to redis 7.2
562        let val = parse_redis_value(b",-nan\r\n").unwrap();
563        if let Value::Double(val) = val {
564            assert!(val.is_sign_negative());
565            assert!(val.is_nan());
566        } else {
567            panic!("expected double");
568        }
569        //Allow doubles in scientific E notation
570        let val = parse_redis_value(b",2.67923e+8\r\n").unwrap();
571        assert_eq!(val, Value::Double(267923000.0));
572        let val = parse_redis_value(b",2.67923E+8\r\n").unwrap();
573        assert_eq!(val, Value::Double(267923000.0));
574        let val = parse_redis_value(b",-2.67923E+8\r\n").unwrap();
575        assert_eq!(val, Value::Double(-267923000.0));
576        let val = parse_redis_value(b",2.1E-2\r\n").unwrap();
577        assert_eq!(val, Value::Double(0.021));
578
579        let val = parse_redis_value(b",-inf\r\n").unwrap();
580        assert_eq!(val, Value::Double(-f64::INFINITY));
581        let val = parse_redis_value(b",inf\r\n").unwrap();
582        assert_eq!(val, Value::Double(f64::INFINITY));
583    }
584
585    #[test]
586    fn decode_resp3_map() {
587        let val = parse_redis_value(b"%2\r\n+first\r\n:1\r\n+second\r\n:2\r\n").unwrap();
588        let mut v = val.as_map_iter().unwrap();
589        assert_eq!(
590            (&Value::SimpleString("first".to_string()), &Value::Int(1)),
591            v.next().unwrap()
592        );
593        assert_eq!(
594            (&Value::SimpleString("second".to_string()), &Value::Int(2)),
595            v.next().unwrap()
596        );
597    }
598
599    #[test]
600    fn decode_resp3_boolean() {
601        let val = parse_redis_value(b"#t\r\n").unwrap();
602        assert_eq!(val, Value::Boolean(true));
603        let val = parse_redis_value(b"#f\r\n").unwrap();
604        assert_eq!(val, Value::Boolean(false));
605        let val = parse_redis_value(b"#x\r\n");
606        assert!(val.is_err());
607        let val = parse_redis_value(b"#\r\n");
608        assert!(val.is_err());
609    }
610
611    #[test]
612    fn decode_resp3_blob_error() {
613        let val = parse_redis_value(b"!21\r\nSYNTAX invalid syntax\r\n");
614        assert_eq!(
615            val.unwrap(),
616            Value::ServerError(ServerError::ExtensionError {
617                code: "SYNTAX".to_string(),
618                detail: Some("invalid syntax".to_string())
619            })
620        )
621    }
622
623    #[test]
624    fn decode_resp3_big_number() {
625        let val = parse_redis_value(b"(3492890328409238509324850943850943825024385\r\n").unwrap();
626        assert_eq!(
627            val,
628            Value::BigNumber(
629                BigInt::parse_bytes(b"3492890328409238509324850943850943825024385", 10).unwrap()
630            )
631        );
632    }
633
634    #[test]
635    fn decode_resp3_set() {
636        let val = parse_redis_value(b"~5\r\n+orange\r\n+apple\r\n#t\r\n:100\r\n:999\r\n").unwrap();
637        let v = val.as_sequence().unwrap();
638        assert_eq!(Value::SimpleString("orange".to_string()), v[0]);
639        assert_eq!(Value::SimpleString("apple".to_string()), v[1]);
640        assert_eq!(Value::Boolean(true), v[2]);
641        assert_eq!(Value::Int(100), v[3]);
642        assert_eq!(Value::Int(999), v[4]);
643    }
644
645    #[test]
646    fn decode_resp3_push() {
647        let val = parse_redis_value(b">3\r\n+message\r\n+somechannel\r\n+this is the message\r\n")
648            .unwrap();
649        if let Value::Push { ref kind, ref data } = val {
650            assert_eq!(&PushKind::Message, kind);
651            assert_eq!(Value::SimpleString("somechannel".to_string()), data[0]);
652            assert_eq!(
653                Value::SimpleString("this is the message".to_string()),
654                data[1]
655            );
656        } else {
657            panic!("Expected Value::Push")
658        }
659    }
660
661    #[test]
662    fn test_max_recursion_depth_set_and_array() {
663        for test_byte in ["*", "~"] {
664            let initial = format!("{test_byte}1\r\n").as_bytes().to_vec();
665            let end = format!("{test_byte}0\r\n").as_bytes().to_vec();
666
667            let mut ba = initial.repeat(MAX_RECURSE_DEPTH - 1).to_vec();
668            ba.extend(end.clone());
669            match parse_redis_value(&ba) {
670                Ok(Value::Array(a)) => assert_eq!(a.len(), 1),
671                Ok(Value::Set(s)) => assert_eq!(s.len(), 1),
672                _ => panic!("Expected valid array or set"),
673            }
674
675            let mut ba = initial.repeat(MAX_RECURSE_DEPTH).to_vec();
676            ba.extend(end);
677            match parse_redis_value(&ba) {
678                Ok(_) => panic!("Expected ParseError"),
679                Err(e) => assert!(matches!(e.kind(), ErrorKind::ParseError)),
680            }
681        }
682    }
683
684    #[test]
685    fn test_max_recursion_depth_map() {
686        let initial = b"%1\r\n+a\r\n";
687        let end = b"%0\r\n";
688
689        let mut ba = initial.repeat(MAX_RECURSE_DEPTH - 1).to_vec();
690        ba.extend(*end);
691        match parse_redis_value(&ba) {
692            Ok(Value::Map(m)) => assert_eq!(m.len(), 1),
693            Ok(Value::Set(s)) => assert_eq!(s.len(), 1),
694            _ => panic!("Expected valid array or set"),
695        }
696
697        let mut ba = initial.repeat(MAX_RECURSE_DEPTH).to_vec();
698        ba.extend(end);
699        match parse_redis_value(&ba) {
700            Ok(_) => panic!("Expected ParseError"),
701            Err(e) => assert!(matches!(e.kind(), ErrorKind::ParseError)),
702        }
703    }
704}