1use std::{
2 io::{self, Read},
3 str,
4};
5
6use crate::types::{
7 ErrorKind, PushKind, RedisError, RedisResult, ServerError, ServerErrorKind, Value,
8 VerbatimFormat,
9};
10
11use combine::{
12 any,
13 error::StreamError,
14 opaque,
15 parser::{
16 byte::{crlf, take_until_bytes},
17 combinator::{any_send_sync_partial_state, AnySendSyncPartialState},
18 range::{recognize, take},
19 },
20 stream::{
21 decoder::{self, Decoder},
22 PointerOffset, RangeStream, StreamErrorFor,
23 },
24 unexpected_any, ParseError, Parser as _,
25};
26use num_bigint::BigInt;
27
28const MAX_RECURSE_DEPTH: usize = 100;
29
30fn err_parser(line: &str) -> ServerError {
31 let mut pieces = line.splitn(2, ' ');
32 let kind = match pieces.next().unwrap() {
33 "ERR" => ServerErrorKind::ResponseError,
34 "EXECABORT" => ServerErrorKind::ExecAbortError,
35 "LOADING" => ServerErrorKind::BusyLoadingError,
36 "NOSCRIPT" => ServerErrorKind::NoScriptError,
37 "MOVED" => ServerErrorKind::Moved,
38 "ASK" => ServerErrorKind::Ask,
39 "TRYAGAIN" => ServerErrorKind::TryAgain,
40 "CLUSTERDOWN" => ServerErrorKind::ClusterDown,
41 "CROSSSLOT" => ServerErrorKind::CrossSlot,
42 "MASTERDOWN" => ServerErrorKind::MasterDown,
43 "READONLY" => ServerErrorKind::ReadOnly,
44 "NOTBUSY" => ServerErrorKind::NotBusy,
45 "NOSUB" => ServerErrorKind::NoSub,
46 code => {
47 return ServerError::ExtensionError {
48 code: code.to_string(),
49 detail: pieces.next().map(|str| str.to_string()),
50 }
51 }
52 };
53 let detail = pieces.next().map(|str| str.to_string());
54 ServerError::KnownError { kind, detail }
55}
56
57pub fn get_push_kind(kind: String) -> PushKind {
58 match kind.as_str() {
59 "invalidate" => PushKind::Invalidate,
60 "message" => PushKind::Message,
61 "pmessage" => PushKind::PMessage,
62 "smessage" => PushKind::SMessage,
63 "unsubscribe" => PushKind::Unsubscribe,
64 "punsubscribe" => PushKind::PUnsubscribe,
65 "sunsubscribe" => PushKind::SUnsubscribe,
66 "subscribe" => PushKind::Subscribe,
67 "psubscribe" => PushKind::PSubscribe,
68 "ssubscribe" => PushKind::SSubscribe,
69 _ => PushKind::Other(kind),
70 }
71}
72
73fn value<'a, I>(
74 count: Option<usize>,
75) -> impl combine::Parser<I, Output = Value, PartialState = AnySendSyncPartialState>
76where
77 I: RangeStream<Token = u8, Range = &'a [u8]>,
78 I::Error: combine::ParseError<u8, &'a [u8], I::Position>,
79{
80 let count = count.unwrap_or(1);
81
82 opaque!(any_send_sync_partial_state(
83 any()
84 .then_partial(move |&mut b| {
85 if count > MAX_RECURSE_DEPTH {
86 combine::unexpected_any("Maximum recursion depth exceeded").left()
87 } else {
88 combine::value(b).right()
89 }
90 })
91 .then_partial(move |&mut b| {
92 let line = || {
93 recognize(take_until_bytes(&b"\r\n"[..]).with(take(2).map(|_| ()))).and_then(
94 |line: &[u8]| {
95 str::from_utf8(&line[..line.len() - 2])
96 .map_err(StreamErrorFor::<I>::other)
97 },
98 )
99 };
100
101 let simple_string = || {
102 line().map(|line| {
103 if line == "OK" {
104 Value::Okay
105 } else {
106 Value::SimpleString(line.into())
107 }
108 })
109 };
110
111 let int = || {
112 line().and_then(|line| {
113 line.trim().parse::<i64>().map_err(|_| {
114 StreamErrorFor::<I>::message_static_message(
115 "Expected integer, got garbage",
116 )
117 })
118 })
119 };
120
121 let bulk_string = || {
122 int().then_partial(move |size| {
123 if *size < 0 {
124 combine::produce(|| Value::Nil).left()
125 } else {
126 take(*size as usize)
127 .map(|bs: &[u8]| Value::BulkString(bs.to_vec()))
128 .skip(crlf())
129 .right()
130 }
131 })
132 };
133 let blob = || {
134 int().then_partial(move |size| {
135 take(*size as usize)
136 .map(|bs: &[u8]| String::from_utf8_lossy(bs).to_string())
137 .skip(crlf())
138 })
139 };
140
141 let array = || {
142 int().then_partial(move |&mut length| {
143 if length < 0 {
144 combine::produce(|| Value::Nil).left()
145 } else {
146 let length = length as usize;
147 combine::count_min_max(length, length, value(Some(count + 1)))
148 .map(Value::Array)
149 .right()
150 }
151 })
152 };
153
154 let error = || line().map(err_parser);
155 let map = || {
156 int().then_partial(move |&mut kv_length| {
157 match (kv_length as usize).checked_mul(2) {
158 Some(length) => {
159 combine::count_min_max(length, length, value(Some(count + 1)))
160 .map(move |result: Vec<Value>| {
161 let mut it = result.into_iter();
162 let mut x = vec![];
163 for _ in 0..kv_length {
164 if let (Some(k), Some(v)) = (it.next(), it.next()) {
165 x.push((k, v))
166 }
167 }
168 Value::Map(x)
169 })
170 .left()
171 }
172 None => {
173 unexpected_any("Attribute key-value length is too large").right()
174 }
175 }
176 })
177 };
178 let attribute = || {
179 int().then_partial(move |&mut kv_length| {
180 match (kv_length as usize).checked_mul(2) {
181 Some(length) => {
182 let length = length + 1;
184 combine::count_min_max(length, length, value(Some(count + 1)))
185 .map(move |result: Vec<Value>| {
186 let mut it = result.into_iter();
187 let mut attributes = vec![];
188 for _ in 0..kv_length {
189 if let (Some(k), Some(v)) = (it.next(), it.next()) {
190 attributes.push((k, v))
191 }
192 }
193 Value::Attribute {
194 data: Box::new(it.next().unwrap()),
195 attributes,
196 }
197 })
198 .left()
199 }
200 None => {
201 unexpected_any("Attribute key-value length is too large").right()
202 }
203 }
204 })
205 };
206 let set = || {
207 int().then_partial(move |&mut length| {
208 if length < 0 {
209 combine::produce(|| Value::Nil).left()
210 } else {
211 let length = length as usize;
212 combine::count_min_max(length, length, value(Some(count + 1)))
213 .map(Value::Set)
214 .right()
215 }
216 })
217 };
218 let push = || {
219 int().then_partial(move |&mut length| {
220 if length <= 0 {
221 combine::produce(|| Value::Push {
222 kind: PushKind::Other("".to_string()),
223 data: vec![],
224 })
225 .left()
226 } else {
227 let length = length as usize;
228 combine::count_min_max(length, length, value(Some(count + 1)))
229 .and_then(|result: Vec<Value>| {
230 let mut it = result.into_iter();
231 let first = it.next().unwrap_or(Value::Nil);
232 if let Value::BulkString(kind) = first {
233 let push_kind = String::from_utf8(kind)
234 .map_err(StreamErrorFor::<I>::other)?;
235 Ok(Value::Push {
236 kind: get_push_kind(push_kind),
237 data: it.collect(),
238 })
239 } else if let Value::SimpleString(kind) = first {
240 Ok(Value::Push {
241 kind: get_push_kind(kind),
242 data: it.collect(),
243 })
244 } else {
245 Err(StreamErrorFor::<I>::message_static_message(
246 "parse error when decoding push",
247 ))
248 }
249 })
250 .right()
251 }
252 })
253 };
254 let null = || line().map(|_| Value::Nil);
255 let double = || {
256 line().and_then(|line| {
257 line.trim()
258 .parse::<f64>()
259 .map_err(StreamErrorFor::<I>::other)
260 })
261 };
262 let boolean = || {
263 line().and_then(|line: &str| match line {
264 "t" => Ok(true),
265 "f" => Ok(false),
266 _ => Err(StreamErrorFor::<I>::message_static_message(
267 "Expected boolean, got garbage",
268 )),
269 })
270 };
271 let blob_error = || blob().map(|line| err_parser(&line));
272 let verbatim = || {
273 blob().and_then(|line| {
274 if let Some((format, text)) = line.split_once(':') {
275 let format = match format {
276 "txt" => VerbatimFormat::Text,
277 "mkd" => VerbatimFormat::Markdown,
278 x => VerbatimFormat::Unknown(x.to_string()),
279 };
280 Ok(Value::VerbatimString {
281 format,
282 text: text.to_string(),
283 })
284 } else {
285 Err(StreamErrorFor::<I>::message_static_message(
286 "parse error when decoding verbatim string",
287 ))
288 }
289 })
290 };
291 let big_number = || {
292 line().and_then(|line| {
293 BigInt::parse_bytes(line.as_bytes(), 10).ok_or_else(|| {
294 StreamErrorFor::<I>::message_static_message(
295 "Expected bigint, got garbage",
296 )
297 })
298 })
299 };
300 combine::dispatch!(b;
301 b'+' => simple_string(),
302 b':' => int().map(Value::Int),
303 b'$' => bulk_string(),
304 b'*' => array(),
305 b'%' => map(),
306 b'|' => attribute(),
307 b'~' => set(),
308 b'-' => error().map(Value::ServerError),
309 b'_' => null(),
310 b',' => double().map(Value::Double),
311 b'#' => boolean().map(Value::Boolean),
312 b'!' => blob_error().map(Value::ServerError),
313 b'=' => verbatim(),
314 b'(' => big_number().map(Value::BigNumber),
315 b'>' => push(),
316 b => combine::unexpected_any(combine::error::Token(b))
317 )
318 })
319 ))
320}
321
322macro_rules! to_redis_err {
324 ($err: expr, $decoder: expr) => {
325 match $err {
326 decoder::Error::Io { error, .. } => error.into(),
327 decoder::Error::Parse(err) => {
328 if err.is_unexpected_end_of_input() {
329 RedisError::from(io::Error::from(io::ErrorKind::UnexpectedEof))
330 } else {
331 let err = err
332 .map_range(|range| format!("{range:?}"))
333 .map_position(|pos| pos.translate_position($decoder.buffer()))
334 .to_string();
335 RedisError::from((ErrorKind::ParseError, "parse error", err))
336 }
337 }
338 }
339 };
340}
341
342#[cfg(feature = "aio")]
343mod aio_support {
344 use super::*;
345
346 use bytes::{Buf, BytesMut};
347 use tokio::io::AsyncRead;
348 use tokio_util::codec::{Decoder, Encoder};
349
350 #[derive(Default)]
351 pub struct ValueCodec {
352 state: AnySendSyncPartialState,
353 }
354
355 impl ValueCodec {
356 fn decode_stream(&mut self, bytes: &mut BytesMut, eof: bool) -> RedisResult<Option<Value>> {
357 let (opt, removed_len) = {
358 let buffer = &bytes[..];
359 let mut stream =
360 combine::easy::Stream(combine::stream::MaybePartialStream(buffer, !eof));
361 match combine::stream::decode_tokio(value(None), &mut stream, &mut self.state) {
362 Ok(x) => x,
363 Err(err) => {
364 let err = err
365 .map_position(|pos| pos.translate_position(buffer))
366 .map_range(|range| format!("{range:?}"))
367 .to_string();
368 return Err(RedisError::from((
369 ErrorKind::ParseError,
370 "parse error",
371 err,
372 )));
373 }
374 }
375 };
376
377 bytes.advance(removed_len);
378 match opt {
379 Some(result) => Ok(Some(result)),
380 None => Ok(None),
381 }
382 }
383 }
384
385 impl Encoder<Vec<u8>> for ValueCodec {
386 type Error = RedisError;
387 fn encode(&mut self, item: Vec<u8>, dst: &mut BytesMut) -> Result<(), Self::Error> {
388 dst.extend_from_slice(item.as_ref());
389 Ok(())
390 }
391 }
392
393 impl Decoder for ValueCodec {
394 type Item = Value;
395 type Error = RedisError;
396
397 fn decode(&mut self, bytes: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
398 self.decode_stream(bytes, false)
399 }
400
401 fn decode_eof(&mut self, bytes: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
402 self.decode_stream(bytes, true)
403 }
404 }
405
406 pub async fn parse_redis_value_async<R>(
408 decoder: &mut combine::stream::Decoder<AnySendSyncPartialState, PointerOffset<[u8]>>,
409 read: &mut R,
410 ) -> RedisResult<Value>
411 where
412 R: AsyncRead + std::marker::Unpin,
413 {
414 let result = combine::decode_tokio!(*decoder, *read, value(None), |input, _| {
415 combine::stream::easy::Stream::from(input)
416 });
417 match result {
418 Err(err) => Err(to_redis_err!(err, decoder)),
419 Ok(result) => Ok(result),
420 }
421 }
422}
423
424#[cfg(feature = "aio")]
425#[cfg_attr(docsrs, doc(cfg(feature = "aio")))]
426pub use self::aio_support::*;
427
428pub struct Parser {
430 decoder: Decoder<AnySendSyncPartialState, PointerOffset<[u8]>>,
431}
432
433impl Default for Parser {
434 fn default() -> Self {
435 Parser::new()
436 }
437}
438
439impl Parser {
444 pub fn new() -> Parser {
449 Parser {
450 decoder: Decoder::new(),
451 }
452 }
453
454 pub fn parse_value<T: Read>(&mut self, mut reader: T) -> RedisResult<Value> {
458 let mut decoder = &mut self.decoder;
459 let result = combine::decode!(decoder, reader, value(None), |input, _| {
460 combine::stream::easy::Stream::from(input)
461 });
462 match result {
463 Err(err) => Err(to_redis_err!(err, decoder)),
464 Ok(result) => Ok(result),
465 }
466 }
467}
468
469pub fn parse_redis_value(bytes: &[u8]) -> RedisResult<Value> {
474 let mut parser = Parser::new();
475 parser.parse_value(bytes)
476}
477
478#[cfg(test)]
479mod tests {
480 use super::*;
481
482 #[cfg(feature = "aio")]
483 #[test]
484 fn decode_eof_returns_none_at_eof() {
485 use tokio_util::codec::Decoder;
486 let mut codec = ValueCodec::default();
487
488 let mut bytes = bytes::BytesMut::from(&b"+GET 123\r\n"[..]);
489 assert_eq!(
490 codec.decode_eof(&mut bytes),
491 Ok(Some(parse_redis_value(b"+GET 123\r\n").unwrap()))
492 );
493 assert_eq!(codec.decode_eof(&mut bytes), Ok(None));
494 assert_eq!(codec.decode_eof(&mut bytes), Ok(None));
495 }
496
497 #[cfg(feature = "aio")]
498 #[test]
499 fn decode_eof_returns_error_inside_array_and_can_parse_more_inputs() {
500 use tokio_util::codec::Decoder;
501 let mut codec = ValueCodec::default();
502
503 let mut bytes =
504 bytes::BytesMut::from(b"*3\r\n+OK\r\n-LOADING server is loading\r\n+OK\r\n".as_slice());
505 let result = codec.decode_eof(&mut bytes).unwrap().unwrap();
506
507 assert_eq!(
508 result,
509 Value::Array(vec![
510 Value::Okay,
511 Value::ServerError(ServerError::KnownError {
512 kind: ServerErrorKind::BusyLoadingError,
513 detail: Some("server is loading".to_string())
514 }),
515 Value::Okay
516 ])
517 );
518
519 let mut bytes = bytes::BytesMut::from(b"+OK\r\n".as_slice());
520 let result = codec.decode_eof(&mut bytes).unwrap().unwrap();
521
522 assert_eq!(result, Value::Okay);
523 }
524
525 #[test]
526 fn parse_nested_error_and_handle_more_inputs() {
527 let bytes = b"*3\r\n+OK\r\n-LOADING server is loading\r\n+OK\r\n";
531 let result = parse_redis_value(bytes);
532
533 assert_eq!(
534 result.unwrap(),
535 Value::Array(vec![
536 Value::Okay,
537 Value::ServerError(ServerError::KnownError {
538 kind: ServerErrorKind::BusyLoadingError,
539 detail: Some("server is loading".to_string())
540 }),
541 Value::Okay
542 ])
543 );
544
545 let result = parse_redis_value(b"+OK\r\n").unwrap();
546
547 assert_eq!(result, Value::Okay);
548 }
549
550 #[test]
551 fn decode_resp3_double() {
552 let val = parse_redis_value(b",1.23\r\n").unwrap();
553 assert_eq!(val, Value::Double(1.23));
554 let val = parse_redis_value(b",nan\r\n").unwrap();
555 if let Value::Double(val) = val {
556 assert!(val.is_sign_positive());
557 assert!(val.is_nan());
558 } else {
559 panic!("expected double");
560 }
561 let val = parse_redis_value(b",-nan\r\n").unwrap();
563 if let Value::Double(val) = val {
564 assert!(val.is_sign_negative());
565 assert!(val.is_nan());
566 } else {
567 panic!("expected double");
568 }
569 let val = parse_redis_value(b",2.67923e+8\r\n").unwrap();
571 assert_eq!(val, Value::Double(267923000.0));
572 let val = parse_redis_value(b",2.67923E+8\r\n").unwrap();
573 assert_eq!(val, Value::Double(267923000.0));
574 let val = parse_redis_value(b",-2.67923E+8\r\n").unwrap();
575 assert_eq!(val, Value::Double(-267923000.0));
576 let val = parse_redis_value(b",2.1E-2\r\n").unwrap();
577 assert_eq!(val, Value::Double(0.021));
578
579 let val = parse_redis_value(b",-inf\r\n").unwrap();
580 assert_eq!(val, Value::Double(-f64::INFINITY));
581 let val = parse_redis_value(b",inf\r\n").unwrap();
582 assert_eq!(val, Value::Double(f64::INFINITY));
583 }
584
585 #[test]
586 fn decode_resp3_map() {
587 let val = parse_redis_value(b"%2\r\n+first\r\n:1\r\n+second\r\n:2\r\n").unwrap();
588 let mut v = val.as_map_iter().unwrap();
589 assert_eq!(
590 (&Value::SimpleString("first".to_string()), &Value::Int(1)),
591 v.next().unwrap()
592 );
593 assert_eq!(
594 (&Value::SimpleString("second".to_string()), &Value::Int(2)),
595 v.next().unwrap()
596 );
597 }
598
599 #[test]
600 fn decode_resp3_boolean() {
601 let val = parse_redis_value(b"#t\r\n").unwrap();
602 assert_eq!(val, Value::Boolean(true));
603 let val = parse_redis_value(b"#f\r\n").unwrap();
604 assert_eq!(val, Value::Boolean(false));
605 let val = parse_redis_value(b"#x\r\n");
606 assert!(val.is_err());
607 let val = parse_redis_value(b"#\r\n");
608 assert!(val.is_err());
609 }
610
611 #[test]
612 fn decode_resp3_blob_error() {
613 let val = parse_redis_value(b"!21\r\nSYNTAX invalid syntax\r\n");
614 assert_eq!(
615 val.unwrap(),
616 Value::ServerError(ServerError::ExtensionError {
617 code: "SYNTAX".to_string(),
618 detail: Some("invalid syntax".to_string())
619 })
620 )
621 }
622
623 #[test]
624 fn decode_resp3_big_number() {
625 let val = parse_redis_value(b"(3492890328409238509324850943850943825024385\r\n").unwrap();
626 assert_eq!(
627 val,
628 Value::BigNumber(
629 BigInt::parse_bytes(b"3492890328409238509324850943850943825024385", 10).unwrap()
630 )
631 );
632 }
633
634 #[test]
635 fn decode_resp3_set() {
636 let val = parse_redis_value(b"~5\r\n+orange\r\n+apple\r\n#t\r\n:100\r\n:999\r\n").unwrap();
637 let v = val.as_sequence().unwrap();
638 assert_eq!(Value::SimpleString("orange".to_string()), v[0]);
639 assert_eq!(Value::SimpleString("apple".to_string()), v[1]);
640 assert_eq!(Value::Boolean(true), v[2]);
641 assert_eq!(Value::Int(100), v[3]);
642 assert_eq!(Value::Int(999), v[4]);
643 }
644
645 #[test]
646 fn decode_resp3_push() {
647 let val = parse_redis_value(b">3\r\n+message\r\n+somechannel\r\n+this is the message\r\n")
648 .unwrap();
649 if let Value::Push { ref kind, ref data } = val {
650 assert_eq!(&PushKind::Message, kind);
651 assert_eq!(Value::SimpleString("somechannel".to_string()), data[0]);
652 assert_eq!(
653 Value::SimpleString("this is the message".to_string()),
654 data[1]
655 );
656 } else {
657 panic!("Expected Value::Push")
658 }
659 }
660
661 #[test]
662 fn test_max_recursion_depth_set_and_array() {
663 for test_byte in ["*", "~"] {
664 let initial = format!("{test_byte}1\r\n").as_bytes().to_vec();
665 let end = format!("{test_byte}0\r\n").as_bytes().to_vec();
666
667 let mut ba = initial.repeat(MAX_RECURSE_DEPTH - 1).to_vec();
668 ba.extend(end.clone());
669 match parse_redis_value(&ba) {
670 Ok(Value::Array(a)) => assert_eq!(a.len(), 1),
671 Ok(Value::Set(s)) => assert_eq!(s.len(), 1),
672 _ => panic!("Expected valid array or set"),
673 }
674
675 let mut ba = initial.repeat(MAX_RECURSE_DEPTH).to_vec();
676 ba.extend(end);
677 match parse_redis_value(&ba) {
678 Ok(_) => panic!("Expected ParseError"),
679 Err(e) => assert!(matches!(e.kind(), ErrorKind::ParseError)),
680 }
681 }
682 }
683
684 #[test]
685 fn test_max_recursion_depth_map() {
686 let initial = b"%1\r\n+a\r\n";
687 let end = b"%0\r\n";
688
689 let mut ba = initial.repeat(MAX_RECURSE_DEPTH - 1).to_vec();
690 ba.extend(*end);
691 match parse_redis_value(&ba) {
692 Ok(Value::Map(m)) => assert_eq!(m.len(), 1),
693 Ok(Value::Set(s)) => assert_eq!(s.len(), 1),
694 _ => panic!("Expected valid array or set"),
695 }
696
697 let mut ba = initial.repeat(MAX_RECURSE_DEPTH).to_vec();
698 ba.extend(end);
699 match parse_redis_value(&ba) {
700 Ok(_) => panic!("Expected ParseError"),
701 Err(e) => assert!(matches!(e.kind(), ErrorKind::ParseError)),
702 }
703 }
704}