combine/stream/
buffered.rs

1use alloc::collections::VecDeque;
2
3use crate::{
4    error::StreamError,
5    stream::{ParseError, Positioned, ResetStream, StreamErrorFor, StreamOnce},
6};
7
8/// `Stream` which buffers items from an instance of `StreamOnce` into a ring buffer.
9/// Instances of `StreamOnce` which is not able to implement `ResetStream` (such as `ReadStream`) may
10/// use this as a way to implement `ResetStream` and become a full `Stream` instance.
11///
12/// The drawback is that the buffer only stores a limited number of items which limits how many
13/// tokens that can be reset and replayed. If a `buffered::Stream` is reset past this limit an error
14/// will be returned when `uncons` is next called.
15///
16/// NOTE: If this stream is used in conjunction with an error enhancing stream such as
17/// `easy::Stream` (also via the `easy_parser` method) it is recommended that the `buffered::Stream`
18/// instance wraps the `easy::Stream` instance instead of the other way around.
19///
20/// ```ignore
21/// // DO
22/// buffered::Stream::new(easy::Stream(..), ..)
23/// // DON'T
24/// easy::Stream(buffered::Stream::new(.., ..))
25/// parser.easy_parse(buffered::Stream::new(..));
26/// ```
27#[derive(Debug, PartialEq)]
28pub struct Stream<Input>
29where
30    Input: StreamOnce + Positioned,
31{
32    offset: usize,
33    iter: Input,
34    buffer_offset: usize,
35    buffer: VecDeque<(Input::Token, Input::Position)>,
36}
37
38impl<Input> ResetStream for Stream<Input>
39where
40    Input: Positioned,
41{
42    type Checkpoint = usize;
43
44    fn checkpoint(&self) -> Self::Checkpoint {
45        self.offset
46    }
47
48    fn reset(&mut self, checkpoint: Self::Checkpoint) -> Result<(), Self::Error> {
49        if checkpoint < self.buffer_offset - self.buffer.len() {
50            // We have backtracked to far
51            Err(Self::Error::from_error(
52                self.position(),
53                StreamErrorFor::<Self>::message_static_message("Backtracked to far"),
54            ))
55        } else {
56            self.offset = checkpoint;
57            Ok(())
58        }
59    }
60}
61
62impl<Input> Stream<Input>
63where
64    Input: StreamOnce + Positioned,
65    Input::Position: Clone,
66    Input::Token: Clone,
67{
68    /// Constructs a new `BufferedStream` from a `StreamOnce` instance with a `lookahead`
69    /// number of elements that can be stored in the buffer.
70    pub fn new(iter: Input, lookahead: usize) -> Stream<Input> {
71        Stream {
72            offset: 0,
73            iter,
74            buffer_offset: 0,
75            buffer: VecDeque::with_capacity(lookahead),
76        }
77    }
78}
79
80impl<Input> Positioned for Stream<Input>
81where
82    Input: StreamOnce + Positioned,
83{
84    #[inline]
85    fn position(&self) -> Self::Position {
86        if self.offset >= self.buffer_offset {
87            self.iter.position()
88        } else if self.offset < self.buffer_offset - self.buffer.len() {
89            self.buffer
90                .front()
91                .expect("At least 1 element in the buffer")
92                .1
93                .clone()
94        } else {
95            self.buffer[self.buffer.len() - (self.buffer_offset - self.offset)]
96                .1
97                .clone()
98        }
99    }
100}
101
102impl<Input> StreamOnce for Stream<Input>
103where
104    Input: StreamOnce + Positioned,
105    Input::Token: Clone,
106{
107    type Token = Input::Token;
108    type Range = Input::Range;
109    type Position = Input::Position;
110    type Error = Input::Error;
111
112    #[inline]
113    fn uncons(&mut self) -> Result<Input::Token, StreamErrorFor<Self>> {
114        if self.offset >= self.buffer_offset {
115            let position = self.iter.position();
116            let token = self.iter.uncons()?;
117            self.buffer_offset += 1;
118            // We want the VecDeque to only keep the last .capacity() elements so we need to remove
119            // an element if it gets to large
120            if self.buffer.len() == self.buffer.capacity() {
121                self.buffer.pop_front();
122            }
123            self.buffer.push_back((token.clone(), position));
124            self.offset += 1;
125            Ok(token)
126        } else if self.offset < self.buffer_offset - self.buffer.len() {
127            // We have backtracked to far
128            Err(StreamError::message_static_message("Backtracked to far"))
129        } else {
130            let value = self.buffer[self.buffer.len() - (self.buffer_offset - self.offset)]
131                .0
132                .clone();
133            self.offset += 1;
134            Ok(value)
135        }
136    }
137
138    fn is_partial(&self) -> bool {
139        self.iter.is_partial()
140    }
141}