combine/stream/
decoder.rs

1use crate::{
2    error::ParseError,
3    stream::buf_reader::{Buffer, Bufferless, CombineBuffer},
4};
5
6use std::{
7    fmt,
8    io::{self, Read},
9};
10
11#[cfg(feature = "pin-project-lite")]
12use std::pin::Pin;
13
14#[derive(Debug)]
15pub enum Error<E, P> {
16    Parse(E),
17    Io { position: P, error: io::Error },
18}
19
20impl<'a, P> From<Error<crate::easy::Errors<u8, &'a [u8], P>, P>>
21    for crate::easy::Errors<u8, &'a [u8], P>
22where
23    P: Ord + Clone,
24{
25    fn from(e: Error<crate::easy::Errors<u8, &'a [u8], P>, P>) -> Self {
26        match e {
27            Error::Parse(e) => e,
28            Error::Io { position, error } => {
29                crate::easy::Errors::from_error(position, crate::easy::Error::Other(error.into()))
30            }
31        }
32    }
33}
34
35impl<E, P> std::error::Error for Error<E, P>
36where
37    E: std::error::Error,
38    P: fmt::Display + fmt::Debug,
39{
40}
41
42impl<E: fmt::Display, P: fmt::Display> fmt::Display for Error<E, P> {
43    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
44        match self {
45            Error::Parse(e) => e.fmt(f),
46            Error::Io { position: _, error } => error.fmt(f),
47        }
48    }
49}
50
51#[derive(Default)]
52/// Used together with the `decode!` macro
53pub struct Decoder<S, P, C = Buffer> {
54    position: P,
55    state: S,
56    buffer: C,
57    end_of_input: bool,
58}
59
60impl<S, P> Decoder<S, P, Buffer>
61where
62    P: Default,
63    S: Default,
64{
65    /// Constructs a new [`Decoder`] with an internal buffer. Allows any `AsyncRead/Read` instance to
66    /// be used when decoding but there may be data left in the internal buffer after decoding
67    /// (accessible with [`Decoder::buffer`])
68    pub fn new() -> Self {
69        Decoder::default()
70    }
71
72    /// Constructs a new [`Decoder`] with an internal buffer. Allows any `AsyncRead/Read` instance to
73    /// be used when decoding but there may be data left in the internal buffer after decoding
74    /// (accessible with [`Decoder::buffer`])
75    pub fn new_buffer() -> Self {
76        Decoder::new()
77    }
78}
79
80impl<S, P> Decoder<S, P, Bufferless>
81where
82    P: Default,
83    S: Default,
84{
85    /// Constructs a new `Decoder` without an internal buffer. Requires the read instance to be
86    /// wrapped with combine's [`BufReader`] instance to
87    ///
88    /// [`BufReader`]: super::buf_reader::BufReader
89    pub fn new_bufferless() -> Self {
90        Decoder::default()
91    }
92}
93
94impl<S, P> Decoder<S, P> {
95    pub fn buffer(&self) -> &[u8] {
96        &self.buffer.0
97    }
98}
99
100impl<S, P, C> Decoder<S, P, C> {
101    #[doc(hidden)]
102    pub fn advance<R>(&mut self, read: &mut R, removed: usize)
103    where
104        C: CombineBuffer<R>,
105    {
106        // Remove the data we have parsed and adjust `removed` to be the amount of data we
107        // committed from `self.reader`
108        self.buffer.advance(read, removed)
109    }
110
111    #[doc(hidden)]
112    #[cfg(feature = "pin-project-lite")]
113    pub fn advance_pin<R>(&mut self, read: Pin<&mut R>, removed: usize)
114    where
115        C: CombineBuffer<R>,
116    {
117        // Remove the data we have parsed and adjust `removed` to be the amount of data we
118        // committed from `self.reader`
119        self.buffer.advance_pin(read, removed);
120    }
121
122    pub fn position(&self) -> &P {
123        &self.position
124    }
125
126    #[doc(hidden)]
127    pub fn __inner(&mut self) -> (&mut S, &mut P, &C, bool) {
128        (
129            &mut self.state,
130            &mut self.position,
131            &self.buffer,
132            self.end_of_input,
133        )
134    }
135}
136
137impl<S, P, C> Decoder<S, P, C>
138where
139    C: ,
140{
141    #[doc(hidden)]
142    pub fn __before_parse<R>(&mut self, mut reader: R) -> io::Result<()>
143    where
144        R: Read,
145        C: crate::stream::buf_reader::CombineSyncRead<R>,
146    {
147        if self.buffer.extend_buf_sync(&mut reader)? == 0 {
148            self.end_of_input = true;
149        }
150
151        Ok(())
152    }
153}
154
155#[cfg(feature = "tokio-02")]
156impl<S, P, C> Decoder<S, P, C> {
157    #[doc(hidden)]
158    pub async fn __before_parse_tokio_02<R>(&mut self, mut reader: Pin<&mut R>) -> io::Result<()>
159    where
160        R: tokio_02_dep::io::AsyncRead,
161        C: crate::stream::buf_reader::CombineRead<R, dyn tokio_02_dep::io::AsyncRead>,
162    {
163        let copied =
164            crate::future_ext::poll_fn(|cx| self.buffer.poll_extend_buf(cx, reader.as_mut()))
165                .await?;
166        if copied == 0 {
167            self.end_of_input = true;
168        }
169
170        Ok(())
171    }
172}
173
174#[cfg(feature = "tokio-03")]
175impl<S, P, C> Decoder<S, P, C> {
176    #[doc(hidden)]
177    pub async fn __before_parse_tokio_03<R>(&mut self, mut reader: Pin<&mut R>) -> io::Result<()>
178    where
179        R: tokio_03_dep::io::AsyncRead,
180        C: crate::stream::buf_reader::CombineRead<R, dyn tokio_03_dep::io::AsyncRead>,
181    {
182        let copied =
183            crate::future_ext::poll_fn(|cx| self.buffer.poll_extend_buf(cx, reader.as_mut()))
184                .await?;
185        if copied == 0 {
186            self.end_of_input = true;
187        }
188
189        Ok(())
190    }
191}
192
193#[cfg(feature = "tokio")]
194impl<S, P, C> Decoder<S, P, C> {
195    #[doc(hidden)]
196    pub async fn __before_parse_tokio<R>(&mut self, mut reader: Pin<&mut R>) -> io::Result<()>
197    where
198        R: tokio_dep::io::AsyncRead,
199        C: crate::stream::buf_reader::CombineRead<R, dyn tokio_dep::io::AsyncRead>,
200    {
201        let copied =
202            crate::future_ext::poll_fn(|cx| self.buffer.poll_extend_buf(cx, reader.as_mut()))
203                .await?;
204        if copied == 0 {
205            self.end_of_input = true;
206        }
207
208        Ok(())
209    }
210}
211
212#[cfg(feature = "futures-03")]
213impl<S, P, C> Decoder<S, P, C> {
214    #[doc(hidden)]
215    pub async fn __before_parse_async<R>(&mut self, reader: Pin<&mut R>) -> io::Result<()>
216    where
217        R: futures_io_03::AsyncRead,
218        C: crate::stream::buf_reader::CombineAsyncRead<R>,
219    {
220        let copied = self.buffer.extend_buf(reader).await?;
221
222        if copied == 0 {
223            self.end_of_input = true;
224        }
225        Ok(())
226    }
227}