combine/stream/
buffered.rs1use alloc::collections::VecDeque;
2
3use crate::{
4 error::StreamError,
5 stream::{ParseError, Positioned, ResetStream, StreamErrorFor, StreamOnce},
6};
7
8#[derive(Debug, PartialEq)]
28pub struct Stream<Input>
29where
30 Input: StreamOnce + Positioned,
31{
32 offset: usize,
33 iter: Input,
34 buffer_offset: usize,
35 buffer: VecDeque<(Input::Token, Input::Position)>,
36}
37
38impl<Input> ResetStream for Stream<Input>
39where
40 Input: Positioned,
41{
42 type Checkpoint = usize;
43
44 fn checkpoint(&self) -> Self::Checkpoint {
45 self.offset
46 }
47
48 fn reset(&mut self, checkpoint: Self::Checkpoint) -> Result<(), Self::Error> {
49 if checkpoint < self.buffer_offset - self.buffer.len() {
50 Err(Self::Error::from_error(
52 self.position(),
53 StreamErrorFor::<Self>::message_static_message("Backtracked to far"),
54 ))
55 } else {
56 self.offset = checkpoint;
57 Ok(())
58 }
59 }
60}
61
62impl<Input> Stream<Input>
63where
64 Input: StreamOnce + Positioned,
65 Input::Position: Clone,
66 Input::Token: Clone,
67{
68 pub fn new(iter: Input, lookahead: usize) -> Stream<Input> {
71 Stream {
72 offset: 0,
73 iter,
74 buffer_offset: 0,
75 buffer: VecDeque::with_capacity(lookahead),
76 }
77 }
78}
79
80impl<Input> Positioned for Stream<Input>
81where
82 Input: StreamOnce + Positioned,
83{
84 #[inline]
85 fn position(&self) -> Self::Position {
86 if self.offset >= self.buffer_offset {
87 self.iter.position()
88 } else if self.offset < self.buffer_offset - self.buffer.len() {
89 self.buffer
90 .front()
91 .expect("At least 1 element in the buffer")
92 .1
93 .clone()
94 } else {
95 self.buffer[self.buffer.len() - (self.buffer_offset - self.offset)]
96 .1
97 .clone()
98 }
99 }
100}
101
102impl<Input> StreamOnce for Stream<Input>
103where
104 Input: StreamOnce + Positioned,
105 Input::Token: Clone,
106{
107 type Token = Input::Token;
108 type Range = Input::Range;
109 type Position = Input::Position;
110 type Error = Input::Error;
111
112 #[inline]
113 fn uncons(&mut self) -> Result<Input::Token, StreamErrorFor<Self>> {
114 if self.offset >= self.buffer_offset {
115 let position = self.iter.position();
116 let token = self.iter.uncons()?;
117 self.buffer_offset += 1;
118 if self.buffer.len() == self.buffer.capacity() {
121 self.buffer.pop_front();
122 }
123 self.buffer.push_back((token.clone(), position));
124 self.offset += 1;
125 Ok(token)
126 } else if self.offset < self.buffer_offset - self.buffer.len() {
127 Err(StreamError::message_static_message("Backtracked to far"))
129 } else {
130 let value = self.buffer[self.buffer.len() - (self.buffer_offset - self.offset)]
131 .0
132 .clone();
133 self.offset += 1;
134 Ok(value)
135 }
136 }
137
138 fn is_partial(&self) -> bool {
139 self.iter.is_partial()
140 }
141}