1use std::{
2 io::{self, Read},
3 str,
4};
5
6use crate::errors::{ParsingError, RedisError, Repr, ServerError, ServerErrorKind};
7use crate::types::{PushKind, RedisResult, Value, VerbatimFormat};
8
9use combine::{
10 ParseError, Parser as _, any,
11 error::StreamError,
12 opaque,
13 parser::{
14 byte::{crlf, take_until_bytes},
15 combinator::{AnySendSyncPartialState, any_send_sync_partial_state},
16 range::{recognize, take},
17 },
18 stream::{
19 PointerOffset, RangeStream, StreamErrorFor,
20 decoder::{self, Decoder},
21 },
22 unexpected_any,
23};
24
25const MAX_RECURSE_DEPTH: usize = 100;
26
27fn err_parser(line: &str) -> ServerError {
28 let mut pieces = line.splitn(2, ' ');
29 let kind = match pieces.next().unwrap() {
30 "ERR" => ServerErrorKind::ResponseError,
31 "EXECABORT" => ServerErrorKind::ExecAbort,
32 "LOADING" => ServerErrorKind::BusyLoading,
33 "NOSCRIPT" => ServerErrorKind::NoScript,
34 "MOVED" => ServerErrorKind::Moved,
35 "ASK" => ServerErrorKind::Ask,
36 "TRYAGAIN" => ServerErrorKind::TryAgain,
37 "CLUSTERDOWN" => ServerErrorKind::ClusterDown,
38 "CROSSSLOT" => ServerErrorKind::CrossSlot,
39 "MASTERDOWN" => ServerErrorKind::MasterDown,
40 "READONLY" => ServerErrorKind::ReadOnly,
41 "NOTBUSY" => ServerErrorKind::NotBusy,
42 "NOSUB" => ServerErrorKind::NoSub,
43 "NOPERM" => ServerErrorKind::NoPerm,
44 code => {
45 return ServerError(Repr::Extension {
46 code: code.into(),
47 detail: pieces.next().map(|str| str.into()),
48 });
49 }
50 };
51 let detail = pieces.next().map(|str| str.into());
52 ServerError(Repr::Known { kind, detail })
53}
54
55pub fn get_push_kind(kind: String) -> PushKind {
56 match kind.as_str() {
57 "invalidate" => PushKind::Invalidate,
58 "message" => PushKind::Message,
59 "pmessage" => PushKind::PMessage,
60 "smessage" => PushKind::SMessage,
61 "unsubscribe" => PushKind::Unsubscribe,
62 "punsubscribe" => PushKind::PUnsubscribe,
63 "sunsubscribe" => PushKind::SUnsubscribe,
64 "subscribe" => PushKind::Subscribe,
65 "psubscribe" => PushKind::PSubscribe,
66 "ssubscribe" => PushKind::SSubscribe,
67 _ => PushKind::Other(kind),
68 }
69}
70
71fn value<'a, I>(
72 count: Option<usize>,
73) -> impl combine::Parser<I, Output = Value, PartialState = AnySendSyncPartialState>
74where
75 I: RangeStream<Token = u8, Range = &'a [u8]>,
76 I::Error: combine::ParseError<u8, &'a [u8], I::Position>,
77{
78 let count = count.unwrap_or(1);
79
80 opaque!(any_send_sync_partial_state(
81 any()
82 .then_partial(move |&mut b| {
83 if count > MAX_RECURSE_DEPTH {
84 combine::unexpected_any("Maximum recursion depth exceeded").left()
85 } else {
86 combine::value(b).right()
87 }
88 })
89 .then_partial(move |&mut b| {
90 let line = || {
91 recognize(take_until_bytes(&b"\r\n"[..]).with(take(2).map(|_| ()))).and_then(
92 |line: &[u8]| {
93 str::from_utf8(&line[..line.len() - 2])
94 .map_err(StreamErrorFor::<I>::other)
95 },
96 )
97 };
98
99 let simple_string = || {
100 line().map(|line| {
101 if line == "OK" {
102 Value::Okay
103 } else {
104 Value::SimpleString(line.into())
105 }
106 })
107 };
108
109 let int = || {
110 line().and_then(|line| {
111 line.trim().parse::<i64>().map_err(|_| {
112 StreamErrorFor::<I>::message_static_message(
113 "Expected integer, got garbage",
114 )
115 })
116 })
117 };
118
119 let bulk_string = || {
120 int().then_partial(move |size| {
121 if *size < 0 {
122 combine::produce(|| Value::Nil).left()
123 } else {
124 take(*size as usize)
125 .map(|bs: &[u8]| Value::BulkString(bs.to_vec()))
126 .skip(crlf())
127 .right()
128 }
129 })
130 };
131 let blob = || {
132 int().then_partial(move |size| {
133 take(*size as usize)
134 .map(|bs: &[u8]| String::from_utf8_lossy(bs).to_string())
135 .skip(crlf())
136 })
137 };
138
139 let array = || {
140 int().then_partial(move |&mut length| {
141 if length < 0 {
142 combine::produce(|| Value::Nil).left()
143 } else {
144 let length = length as usize;
145 combine::count_min_max(length, length, value(Some(count + 1)))
146 .map(Value::Array)
147 .right()
148 }
149 })
150 };
151
152 let error = || line().map(err_parser);
153 let map = || {
154 int().then_partial(move |&mut kv_length| {
155 match (kv_length as usize).checked_mul(2) {
156 Some(length) => {
157 combine::count_min_max(length, length, value(Some(count + 1)))
158 .map(move |result: Vec<Value>| {
159 let mut it = result.into_iter();
160 let mut x = vec![];
161 for _ in 0..kv_length {
162 if let (Some(k), Some(v)) = (it.next(), it.next()) {
163 x.push((k, v))
164 }
165 }
166 Value::Map(x)
167 })
168 .left()
169 }
170 None => {
171 unexpected_any("Attribute key-value length is too large").right()
172 }
173 }
174 })
175 };
176 let attribute = || {
177 int().then_partial(move |&mut kv_length| {
178 match (kv_length as usize).checked_mul(2) {
179 Some(length) => {
180 let length = length + 1;
182 combine::count_min_max(length, length, value(Some(count + 1)))
183 .map(move |result: Vec<Value>| {
184 let mut it = result.into_iter();
185 let mut attributes = vec![];
186 for _ in 0..kv_length {
187 if let (Some(k), Some(v)) = (it.next(), it.next()) {
188 attributes.push((k, v))
189 }
190 }
191 Value::Attribute {
192 data: Box::new(it.next().unwrap()),
193 attributes,
194 }
195 })
196 .left()
197 }
198 None => {
199 unexpected_any("Attribute key-value length is too large").right()
200 }
201 }
202 })
203 };
204 let set = || {
205 int().then_partial(move |&mut length| {
206 if length < 0 {
207 combine::produce(|| Value::Nil).left()
208 } else {
209 let length = length as usize;
210 combine::count_min_max(length, length, value(Some(count + 1)))
211 .map(Value::Set)
212 .right()
213 }
214 })
215 };
216 let push = || {
217 int().then_partial(move |&mut length| {
218 if length <= 0 {
219 combine::produce(|| Value::Push {
220 kind: PushKind::Other("".to_string()),
221 data: vec![],
222 })
223 .left()
224 } else {
225 let length = length as usize;
226 combine::count_min_max(length, length, value(Some(count + 1)))
227 .and_then(|result: Vec<Value>| {
228 let mut it = result.into_iter();
229 let first = it.next().unwrap_or(Value::Nil);
230 if let Value::BulkString(kind) = first {
231 let push_kind = String::from_utf8(kind)
232 .map_err(StreamErrorFor::<I>::other)?;
233 Ok(Value::Push {
234 kind: get_push_kind(push_kind),
235 data: it.collect(),
236 })
237 } else if let Value::SimpleString(kind) = first {
238 Ok(Value::Push {
239 kind: get_push_kind(kind),
240 data: it.collect(),
241 })
242 } else {
243 Err(StreamErrorFor::<I>::message_static_message(
244 "parse error when decoding push",
245 ))
246 }
247 })
248 .right()
249 }
250 })
251 };
252 let null = || line().map(|_| Value::Nil);
253 let double = || {
254 line().and_then(|line| {
255 line.trim()
256 .parse::<f64>()
257 .map_err(StreamErrorFor::<I>::other)
258 })
259 };
260 let boolean = || {
261 line().and_then(|line: &str| match line {
262 "t" => Ok(true),
263 "f" => Ok(false),
264 _ => Err(StreamErrorFor::<I>::message_static_message(
265 "Expected boolean, got garbage",
266 )),
267 })
268 };
269 let blob_error = || blob().map(|line| err_parser(&line));
270 let verbatim = || {
271 blob().and_then(|line| {
272 if let Some((format, text)) = line.split_once(':') {
273 let format = match format {
274 "txt" => VerbatimFormat::Text,
275 "mkd" => VerbatimFormat::Markdown,
276 x => VerbatimFormat::Unknown(x.to_string()),
277 };
278 Ok(Value::VerbatimString {
279 format,
280 text: text.to_string(),
281 })
282 } else {
283 Err(StreamErrorFor::<I>::message_static_message(
284 "parse error when decoding verbatim string",
285 ))
286 }
287 })
288 };
289 let big_number = || {
290 line().and_then(|line| {
291 #[cfg(not(feature = "num-bigint"))]
292 return Ok::<_, StreamErrorFor<I>>(Value::BigNumber(
293 line.as_bytes().to_vec(),
294 ));
295 #[cfg(feature = "num-bigint")]
296 num_bigint::BigInt::parse_bytes(line.as_bytes(), 10)
297 .ok_or_else(|| {
298 StreamErrorFor::<I>::message_static_message(
299 "Expected bigint, got garbage",
300 )
301 })
302 .map(Value::BigNumber)
303 })
304 };
305 combine::dispatch!(b;
306 b'+' => simple_string(),
307 b':' => int().map(Value::Int),
308 b'$' => bulk_string(),
309 b'*' => array(),
310 b'%' => map(),
311 b'|' => attribute(),
312 b'~' => set(),
313 b'-' => error().map(Value::ServerError),
314 b'_' => null(),
315 b',' => double().map(Value::Double),
316 b'#' => boolean().map(Value::Boolean),
317 b'!' => blob_error().map(Value::ServerError),
318 b'=' => verbatim(),
319 b'(' => big_number(),
320 b'>' => push(),
321 b => combine::unexpected_any(combine::error::Token(b))
322 )
323 })
324 ))
325}
326
327macro_rules! to_redis_err {
329 ($err: expr, $decoder: expr) => {
330 match $err {
331 decoder::Error::Io { error, .. } => error.into(),
332 decoder::Error::Parse(err) => {
333 if err.is_unexpected_end_of_input() {
334 RedisError::from(io::Error::from(io::ErrorKind::UnexpectedEof))
335 } else {
336 let err = err
337 .map_range(|range| format!("{range:?}"))
338 .map_position(|pos| pos.translate_position($decoder.buffer()))
339 .to_string();
340 RedisError::from(ParsingError::from(err))
341 }
342 }
343 }
344 };
345}
346
347#[cfg(feature = "aio")]
348mod aio_support {
349 use super::*;
350
351 use bytes::{Buf, BytesMut};
352 use tokio::io::AsyncRead;
353 use tokio_util::codec::{Decoder, Encoder};
354
355 #[derive(Default)]
356 pub struct ValueCodec {
357 state: AnySendSyncPartialState,
358 }
359
360 impl ValueCodec {
361 fn decode_stream(&mut self, bytes: &mut BytesMut, eof: bool) -> RedisResult<Option<Value>> {
362 let (opt, removed_len) = {
363 let buffer = &bytes[..];
364 let mut stream =
365 combine::easy::Stream(combine::stream::MaybePartialStream(buffer, !eof));
366 match combine::stream::decode_tokio(value(None), &mut stream, &mut self.state) {
367 Ok(x) => x,
368 Err(err) => {
369 let err = err
370 .map_position(|pos| pos.translate_position(buffer))
371 .map_range(|range| format!("{range:?}"))
372 .to_string();
373 return Err(RedisError::from(ParsingError::from(err)));
374 }
375 }
376 };
377
378 bytes.advance(removed_len);
379 match opt {
380 Some(result) => Ok(Some(result)),
381 None => Ok(None),
382 }
383 }
384 }
385
386 impl Encoder<Vec<u8>> for ValueCodec {
387 type Error = RedisError;
388 fn encode(&mut self, item: Vec<u8>, dst: &mut BytesMut) -> Result<(), Self::Error> {
389 dst.extend_from_slice(item.as_ref());
390 Ok(())
391 }
392 }
393
394 impl Decoder for ValueCodec {
395 type Item = Value;
396 type Error = RedisError;
397
398 fn decode(&mut self, bytes: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
399 self.decode_stream(bytes, false)
400 }
401
402 fn decode_eof(&mut self, bytes: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
403 self.decode_stream(bytes, true)
404 }
405 }
406
407 pub async fn parse_redis_value_async<R>(
409 decoder: &mut combine::stream::Decoder<AnySendSyncPartialState, PointerOffset<[u8]>>,
410 read: &mut R,
411 ) -> RedisResult<Value>
412 where
413 R: AsyncRead + std::marker::Unpin,
414 {
415 let result = combine::decode_tokio!(*decoder, *read, value(None), |input, _| {
416 combine::stream::easy::Stream::from(input)
417 });
418 match result {
419 Err(err) => Err(to_redis_err!(err, decoder)),
420 Ok(result) => Ok(result),
421 }
422 }
423}
424
425#[cfg(feature = "aio")]
426#[cfg_attr(docsrs, doc(cfg(feature = "aio")))]
427pub use self::aio_support::*;
428
429pub struct Parser {
431 decoder: Decoder<AnySendSyncPartialState, PointerOffset<[u8]>>,
432}
433
434impl Default for Parser {
435 fn default() -> Self {
436 Parser::new()
437 }
438}
439
440impl Parser {
445 pub fn new() -> Parser {
450 Parser {
451 decoder: Decoder::new(),
452 }
453 }
454
455 pub fn parse_value<T: Read>(&mut self, mut reader: T) -> RedisResult<Value> {
459 let mut decoder = &mut self.decoder;
460 let result = combine::decode!(decoder, reader, value(None), |input, _| {
461 combine::stream::easy::Stream::from(input)
462 });
463 match result {
464 Err(err) => Err(to_redis_err!(err, decoder)),
465 Ok(result) => Ok(result),
466 }
467 }
468}
469
470pub fn parse_redis_value(bytes: &[u8]) -> RedisResult<Value> {
475 let mut parser = Parser::new();
476 parser.parse_value(bytes)
477}
478
479#[cfg(test)]
480mod tests {
481 use super::*;
482 use crate::errors::ErrorKind;
483 use assert_matches::assert_matches;
484
485 #[cfg(feature = "aio")]
486 #[test]
487 fn decode_eof_returns_none_at_eof() {
488 use tokio_util::codec::Decoder;
489 let mut codec = ValueCodec::default();
490
491 let mut bytes = bytes::BytesMut::from(&b"+GET 123\r\n"[..]);
492 assert_eq!(
493 codec.decode_eof(&mut bytes),
494 Ok(Some(parse_redis_value(b"+GET 123\r\n").unwrap()))
495 );
496 assert_eq!(codec.decode_eof(&mut bytes), Ok(None));
497 assert_eq!(codec.decode_eof(&mut bytes), Ok(None));
498 }
499
500 #[cfg(feature = "aio")]
501 #[test]
502 fn decode_eof_returns_error_inside_array_and_can_parse_more_inputs() {
503 use tokio_util::codec::Decoder;
504 let mut codec = ValueCodec::default();
505
506 let mut bytes =
507 bytes::BytesMut::from(b"*3\r\n+OK\r\n-LOADING server is loading\r\n+OK\r\n".as_slice());
508 let result = codec.decode_eof(&mut bytes).unwrap().unwrap();
509
510 assert_eq!(
511 result,
512 Value::Array(vec![
513 Value::Okay,
514 Value::ServerError(ServerError(Repr::Known {
515 kind: ServerErrorKind::BusyLoading,
516 detail: Some(arcstr::literal!("server is loading"))
517 })),
518 Value::Okay
519 ])
520 );
521
522 let mut bytes = bytes::BytesMut::from(b"+OK\r\n".as_slice());
523 let result = codec.decode_eof(&mut bytes).unwrap().unwrap();
524
525 assert_eq!(result, Value::Okay);
526 }
527
528 #[test]
529 fn parse_nested_error_and_handle_more_inputs() {
530 let bytes = b"*3\r\n+OK\r\n-LOADING server is loading\r\n+OK\r\n";
534 let result = parse_redis_value(bytes);
535
536 assert_eq!(
537 result.unwrap(),
538 Value::Array(vec![
539 Value::Okay,
540 Value::ServerError(ServerError(Repr::Known {
541 kind: ServerErrorKind::BusyLoading,
542 detail: Some(arcstr::literal!("server is loading"))
543 })),
544 Value::Okay
545 ])
546 );
547
548 let result = parse_redis_value(b"+OK\r\n").unwrap();
549
550 assert_eq!(result, Value::Okay);
551 }
552
553 #[test]
554 fn decode_resp3_double() {
555 let val = parse_redis_value(b",1.23\r\n").unwrap();
556 assert_eq!(val, Value::Double(1.23));
557 let val = parse_redis_value(b",nan\r\n").unwrap();
558 if let Value::Double(val) = val {
559 assert!(val.is_sign_positive());
560 assert!(val.is_nan());
561 } else {
562 panic!("expected double");
563 }
564 let val = parse_redis_value(b",-nan\r\n").unwrap();
566 if let Value::Double(val) = val {
567 assert!(val.is_sign_negative());
568 assert!(val.is_nan());
569 } else {
570 panic!("expected double");
571 }
572 let val = parse_redis_value(b",2.67923e+8\r\n").unwrap();
574 assert_eq!(val, Value::Double(267923000.0));
575 let val = parse_redis_value(b",2.67923E+8\r\n").unwrap();
576 assert_eq!(val, Value::Double(267923000.0));
577 let val = parse_redis_value(b",-2.67923E+8\r\n").unwrap();
578 assert_eq!(val, Value::Double(-267923000.0));
579 let val = parse_redis_value(b",2.1E-2\r\n").unwrap();
580 assert_eq!(val, Value::Double(0.021));
581
582 let val = parse_redis_value(b",-inf\r\n").unwrap();
583 assert_eq!(val, Value::Double(-f64::INFINITY));
584 let val = parse_redis_value(b",inf\r\n").unwrap();
585 assert_eq!(val, Value::Double(f64::INFINITY));
586 }
587
588 #[test]
589 fn decode_resp3_map() {
590 let val = parse_redis_value(b"%2\r\n+first\r\n:1\r\n+second\r\n:2\r\n").unwrap();
591 let mut v = val.as_map_iter().unwrap();
592 assert_eq!(
593 (&Value::SimpleString("first".to_string()), &Value::Int(1)),
594 v.next().unwrap()
595 );
596 assert_eq!(
597 (&Value::SimpleString("second".to_string()), &Value::Int(2)),
598 v.next().unwrap()
599 );
600 }
601
602 #[test]
603 fn decode_resp3_boolean() {
604 let val = parse_redis_value(b"#t\r\n").unwrap();
605 assert_eq!(val, Value::Boolean(true));
606 let val = parse_redis_value(b"#f\r\n").unwrap();
607 assert_eq!(val, Value::Boolean(false));
608 let val = parse_redis_value(b"#x\r\n");
609 assert_matches!(val, Err(_));
610 let val = parse_redis_value(b"#\r\n");
611 assert_matches!(val, Err(_));
612 }
613
614 #[test]
615 fn decode_resp3_blob_error() {
616 let val = parse_redis_value(b"!21\r\nSYNTAX invalid syntax\r\n");
617 assert_eq!(
618 val.unwrap(),
619 Value::ServerError(ServerError(Repr::Extension {
620 code: arcstr::literal!("SYNTAX"),
621 detail: Some(arcstr::literal!("invalid syntax"))
622 }))
623 )
624 }
625
626 #[test]
627 fn decode_resp3_big_number() {
628 let val = parse_redis_value(b"(3492890328409238509324850943850943825024385\r\n").unwrap();
629 #[cfg(feature = "num-bigint")]
630 let expected = Value::BigNumber(
631 num_bigint::BigInt::parse_bytes(b"3492890328409238509324850943850943825024385", 10)
632 .unwrap(),
633 );
634 #[cfg(not(feature = "num-bigint"))]
635 let expected = Value::BigNumber(b"3492890328409238509324850943850943825024385".to_vec());
636 assert_eq!(val, expected);
637 }
638
639 #[test]
640 fn decode_resp3_set() {
641 let val = parse_redis_value(b"~5\r\n+orange\r\n+apple\r\n#t\r\n:100\r\n:999\r\n").unwrap();
642 let v = val.as_sequence().unwrap();
643 assert_eq!(Value::SimpleString("orange".to_string()), v[0]);
644 assert_eq!(Value::SimpleString("apple".to_string()), v[1]);
645 assert_eq!(Value::Boolean(true), v[2]);
646 assert_eq!(Value::Int(100), v[3]);
647 assert_eq!(Value::Int(999), v[4]);
648 }
649
650 #[test]
651 fn decode_resp3_push() {
652 let val = parse_redis_value(b">3\r\n+message\r\n+somechannel\r\n+this is the message\r\n")
653 .unwrap();
654 if let Value::Push { ref kind, ref data } = val {
655 assert_eq!(&PushKind::Message, kind);
656 assert_eq!(Value::SimpleString("somechannel".to_string()), data[0]);
657 assert_eq!(
658 Value::SimpleString("this is the message".to_string()),
659 data[1]
660 );
661 } else {
662 panic!("Expected Value::Push")
663 }
664 }
665
666 #[test]
667 fn test_max_recursion_depth_set_and_array() {
668 for test_byte in ["*", "~"] {
669 let initial = format!("{test_byte}1\r\n").as_bytes().to_vec();
670 let end = format!("{test_byte}0\r\n").as_bytes().to_vec();
671
672 let mut ba = initial.repeat(MAX_RECURSE_DEPTH - 1).to_vec();
673 ba.extend(end.clone());
674 match parse_redis_value(&ba) {
675 Ok(Value::Array(a)) => assert_eq!(a.len(), 1),
676 Ok(Value::Set(s)) => assert_eq!(s.len(), 1),
677 _ => panic!("Expected valid array or set"),
678 }
679
680 let mut ba = initial.repeat(MAX_RECURSE_DEPTH).to_vec();
681 ba.extend(end);
682 match parse_redis_value(&ba) {
683 Ok(_) => panic!("Expected ParseError"),
684 Err(e) => assert!(matches!(e.kind(), ErrorKind::Parse)),
685 }
686 }
687 }
688
689 #[test]
690 fn test_max_recursion_depth_map() {
691 let initial = b"%1\r\n+a\r\n";
692 let end = b"%0\r\n";
693
694 let mut ba = initial.repeat(MAX_RECURSE_DEPTH - 1).to_vec();
695 ba.extend(*end);
696 match parse_redis_value(&ba) {
697 Ok(Value::Map(m)) => assert_eq!(m.len(), 1),
698 Ok(Value::Set(s)) => assert_eq!(s.len(), 1),
699 _ => panic!("Expected valid array or set"),
700 }
701
702 let mut ba = initial.repeat(MAX_RECURSE_DEPTH).to_vec();
703 ba.extend(end);
704 match parse_redis_value(&ba) {
705 Ok(_) => panic!("Expected ParseError"),
706 Err(e) => assert!(matches!(e.kind(), ErrorKind::Parse)),
707 }
708 }
709}