combine/stream/
mod.rs

1//
2//Traits and implementations of arbitrary data streams.
3//!
4//! Streams are similar to the `Iterator` trait in that they represent some sequential set of items
5//! which can be retrieved one by one. Where `Stream`s differ is that they are allowed to return
6//! errors instead of just `None` and if they implement the `RangeStreamOnce` trait they are also
7//! capable of returning multiple items at the same time, usually in the form of a slice.
8//!
9//! In addition to he functionality above, a proper `Stream` usable by a `Parser` must also have a
10//! position (marked by the `Positioned` trait) and must also be resetable (marked by the
11//! `ResetStream` trait). The former is used to ensure that errors at different points in the stream
12//! aren't combined and the latter is used in parsers such as `or` to try multiple alternative
13//! parses.
14
15use crate::lib::{cmp::Ordering, fmt, marker::PhantomData, str::Chars};
16
17use crate::{
18    error::{
19        ParseError,
20        ParseResult::{self, *},
21        StreamError, StringStreamError, Tracked, UnexpectedParse,
22    },
23    Parser,
24};
25
26#[cfg(feature = "std")]
27pub use self::decoder::Decoder;
28
29#[doc(hidden)]
30#[macro_export]
31macro_rules! clone_resetable {
32    (( $($params: tt)* ) $ty: ty) => {
33        impl<$($params)*> ResetStream for $ty
34            where Self: StreamOnce
35        {
36            type Checkpoint = Self;
37
38            fn checkpoint(&self) -> Self {
39                self.clone()
40            }
41            #[inline]
42            fn reset(&mut self, checkpoint: Self) -> Result<(), Self::Error> {
43                *self = checkpoint;
44                Ok(())
45            }
46        }
47    }
48}
49
50#[cfg(feature = "std")]
51#[cfg_attr(docsrs, doc(cfg(feature = "std")))]
52pub mod buf_reader;
53/// Stream wrapper which provides a `ResetStream` impl for `StreamOnce` impls which do not have
54/// one.
55#[cfg(feature = "alloc")]
56#[cfg_attr(docsrs, doc(cfg(feature = "alloc")))]
57pub mod buffered;
58#[cfg(feature = "std")]
59#[cfg_attr(docsrs, doc(cfg(feature = "std")))]
60pub mod easy;
61/// Stream wrapper which provides more detailed position information.
62pub mod position;
63/// Stream wrapper allowing `std::io::Read` to be used
64#[cfg(feature = "std")]
65#[cfg_attr(docsrs, doc(cfg(feature = "std")))]
66pub mod read;
67pub mod span;
68/// Stream wrapper allowing custom state to be used.
69pub mod state;
70
71#[cfg(feature = "std")]
72#[cfg_attr(docsrs, doc(cfg(feature = "std")))]
73pub mod decoder;
74
75/// A type which has a position.
76pub trait Positioned: StreamOnce {
77    /// Returns the current position of the stream.
78    fn position(&self) -> Self::Position;
79}
80
81/// Convenience alias over the `StreamError` for the input stream `Input`
82///
83/// ```
84/// #[macro_use]
85/// extern crate combine;
86/// use combine::{easy, Parser, Stream, many1};
87/// use combine::parser::char::letter;
88/// use combine::stream::StreamErrorFor;
89/// use combine::error::{ParseError, StreamError};
90///
91/// parser!{
92///    fn parser[Input]()(Input) -> String
93///     where [ Input: Stream<Token = char>, ]
94///     {
95///         many1(letter()).and_then(|word: String| {
96///             if word == "combine" {
97///                 Ok(word)
98///             } else {
99///                 // The alias makes it easy to refer to the `StreamError` type of `Input`
100///                 Err(StreamErrorFor::<Input>::expected_static_message("combine"))
101///             }
102///         })
103///     }
104/// }
105///
106/// fn main() {
107/// }
108/// ```
109pub type StreamErrorFor<Input> = <<Input as StreamOnce>::Error as ParseError<
110    <Input as StreamOnce>::Token,
111    <Input as StreamOnce>::Range,
112    <Input as StreamOnce>::Position,
113>>::StreamError;
114
115/// `StreamOnce` represents a sequence of items that can be extracted one by one.
116pub trait StreamOnce {
117    /// The type of items which is yielded from this stream.
118    type Token: Clone;
119
120    /// The type of a range of items yielded from this stream.
121    /// Types which do not a have a way of yielding ranges of items should just use the
122    /// `Self::Token` for this type.
123    type Range: Clone;
124
125    /// Type which represents the position in a stream.
126    /// `Ord` is required to allow parsers to determine which of two positions are further ahead.
127    type Position: Clone + Ord;
128
129    type Error: ParseError<Self::Token, Self::Range, Self::Position>;
130    /// Takes a stream and removes its first token, yielding the token and the rest of the elements.
131    /// Returns `Err` if no element could be retrieved.
132    fn uncons(&mut self) -> Result<Self::Token, StreamErrorFor<Self>>;
133
134    /// Returns `true` if this stream only contains partial input.
135    ///
136    /// See `PartialStream`.
137    fn is_partial(&self) -> bool {
138        false
139    }
140}
141
142/// A `StreamOnce` which can create checkpoints which the stream can be reset to
143pub trait ResetStream: StreamOnce {
144    type Checkpoint: Clone;
145
146    /// Creates a `Checkpoint` at the current position which can be used to reset the stream
147    /// later to the current position
148    fn checkpoint(&self) -> Self::Checkpoint;
149    /// Attempts to reset the stream to an earlier position.
150    fn reset(&mut self, checkpoint: Self::Checkpoint) -> Result<(), Self::Error>;
151}
152
153clone_resetable! {('a) &'a str}
154clone_resetable! {('a, T) &'a [T]}
155clone_resetable! {('a, T) SliceStream<'a, T> }
156clone_resetable! {(T: Clone) IteratorStream<T>}
157
158/// A stream of tokens which can be duplicated
159///
160/// This is a trait over types which implement the `StreamOnce`, `ResetStream` and `Positioned`
161/// traits. If you need a custom `Stream` object then implement those traits and `Stream` is
162/// implemented automatically.
163pub trait Stream: StreamOnce + ResetStream + Positioned {}
164
165impl<Input> Stream for Input
166where
167    Input: StreamOnce + Positioned + ResetStream,
168{
169}
170
171#[inline]
172pub fn uncons<Input>(input: &mut Input) -> ParseResult<Input::Token, Input::Error>
173where
174    Input: ?Sized + Stream,
175{
176    match input.uncons() {
177        Ok(x) => CommitOk(x),
178        Err(err) => wrap_stream_error(input, err),
179    }
180}
181
182/// A `RangeStream` is an extension of `StreamOnce` which allows for zero copy parsing.
183pub trait RangeStreamOnce: StreamOnce + ResetStream {
184    /// Takes `size` elements from the stream.
185    /// Fails if the length of the stream is less than `size`.
186    fn uncons_range(&mut self, size: usize) -> Result<Self::Range, StreamErrorFor<Self>>;
187
188    /// Takes items from stream, testing each one with `predicate`.
189    /// returns the range of items which passed `predicate`.
190    fn uncons_while<F>(&mut self, f: F) -> Result<Self::Range, StreamErrorFor<Self>>
191    where
192        F: FnMut(Self::Token) -> bool;
193
194    #[inline]
195    /// Takes items from stream, testing each one with `predicate`
196    /// returns a range of at least one items which passed `predicate`.
197    ///
198    /// # Note
199    ///
200    /// This may not return `PeekOk` as it should uncons at least one token.
201    fn uncons_while1<F>(&mut self, mut f: F) -> ParseResult<Self::Range, StreamErrorFor<Self>>
202    where
203        F: FnMut(Self::Token) -> bool,
204    {
205        let mut committed = false;
206        let mut started_at_eoi = true;
207        let result = self.uncons_while(|c| {
208            let ok = f(c);
209            committed |= ok;
210            started_at_eoi = false;
211            ok
212        });
213        if committed {
214            match result {
215                Ok(x) => CommitOk(x),
216                Err(x) => CommitErr(x),
217            }
218        } else if started_at_eoi {
219            PeekErr(Tracked::from(StreamErrorFor::<Self>::end_of_input()))
220        } else {
221            PeekErr(Tracked::from(
222                StreamErrorFor::<Self>::unexpected_static_message(""),
223            ))
224        }
225    }
226
227    /// Returns the distance between `self` and `end`. The returned `usize` must be so that
228    ///
229    /// ```ignore
230    /// let start = stream.checkpoint();
231    /// stream.uncons_range(distance);
232    /// stream.distance(&start) == distance
233    /// ```
234    fn distance(&self, end: &Self::Checkpoint) -> usize;
235
236    /// Returns the entire range of `self`
237    fn range(&self) -> Self::Range;
238}
239
240/// A `RangeStream` is an extension of `Stream` which allows for zero copy parsing.
241pub trait RangeStream: Stream + RangeStreamOnce {}
242
243impl<Input> RangeStream for Input where Input: RangeStreamOnce + Stream {}
244
245#[doc(hidden)]
246pub fn wrap_stream_error<T, Input>(
247    input: &Input,
248    err: <Input::Error as ParseError<Input::Token, Input::Range, Input::Position>>::StreamError,
249) -> ParseResult<T, <Input as StreamOnce>::Error>
250where
251    Input: ?Sized + StreamOnce + Positioned,
252{
253    let err = Input::Error::from_error(input.position(), err);
254    if input.is_partial() {
255        CommitErr(err)
256    } else {
257        PeekErr(err.into())
258    }
259}
260
261#[inline]
262pub fn uncons_range<Input>(
263    input: &mut Input,
264    size: usize,
265) -> ParseResult<Input::Range, <Input as StreamOnce>::Error>
266where
267    Input: ?Sized + RangeStream,
268{
269    match input.uncons_range(size) {
270        Err(err) => wrap_stream_error(input, err),
271        Ok(x) => {
272            if size == 0 {
273                PeekOk(x)
274            } else {
275                CommitOk(x)
276            }
277        }
278    }
279}
280
281#[doc(hidden)]
282pub fn input_at_eof<Input>(input: &mut Input) -> bool
283where
284    Input: ?Sized + Stream,
285{
286    let before = input.checkpoint();
287    let x = input
288        .uncons()
289        .err()
290        .map_or(false, |err| err.is_unexpected_end_of_input());
291    input.reset(before).is_ok() && x
292}
293
294/// Removes items from the input while `predicate` returns `true`.
295#[inline]
296pub fn uncons_while<Input, F>(
297    input: &mut Input,
298    predicate: F,
299) -> ParseResult<Input::Range, Input::Error>
300where
301    F: FnMut(Input::Token) -> bool,
302    Input: ?Sized + RangeStream,
303    Input::Range: Range,
304{
305    match input.uncons_while(predicate) {
306        Err(err) => wrap_stream_error(input, err),
307        Ok(x) => {
308            if input.is_partial() && input_at_eof(input) {
309                // Partial inputs which encounter end of file must fail to let more input be
310                // retrieved
311                CommitErr(Input::Error::from_error(
312                    input.position(),
313                    StreamError::end_of_input(),
314                ))
315            } else if x.len() == 0 {
316                PeekOk(x)
317            } else {
318                CommitOk(x)
319            }
320        }
321    }
322}
323
324#[inline]
325/// Takes items from stream, testing each one with `predicate`
326/// returns a range of at least one items which passed `predicate`.
327///
328/// # Note
329///
330/// This may not return `PeekOk` as it should uncons at least one token.
331pub fn uncons_while1<Input, F>(
332    input: &mut Input,
333    predicate: F,
334) -> ParseResult<Input::Range, Input::Error>
335where
336    F: FnMut(Input::Token) -> bool,
337    Input: ?Sized + RangeStream,
338{
339    match input.uncons_while1(predicate) {
340        CommitOk(x) => {
341            if input.is_partial() && input_at_eof(input) {
342                // Partial inputs which encounter end of file must fail to let more input be
343                // retrieved
344                CommitErr(Input::Error::from_error(
345                    input.position(),
346                    StreamError::end_of_input(),
347                ))
348            } else {
349                CommitOk(x)
350            }
351        }
352        PeekErr(_) => {
353            if input.is_partial() && input_at_eof(input) {
354                // Partial inputs which encounter end of file must fail to let more input be
355                // retrieved
356                CommitErr(Input::Error::from_error(
357                    input.position(),
358                    StreamError::end_of_input(),
359                ))
360            } else {
361                PeekErr(Input::Error::empty(input.position()).into())
362            }
363        }
364        CommitErr(err) => {
365            if input.is_partial() && input_at_eof(input) {
366                // Partial inputs which encounter end of file must fail to let more input be
367                // retrieved
368                CommitErr(Input::Error::from_error(
369                    input.position(),
370                    StreamError::end_of_input(),
371                ))
372            } else {
373                wrap_stream_error(input, err)
374            }
375        }
376        PeekOk(_) => unreachable!(),
377    }
378}
379
380/// Trait representing a range of elements.
381pub trait Range {
382    /// Returns the remaining length of `self`.
383    /// The returned length need not be the same as the number of items left in the stream.
384    fn len(&self) -> usize;
385
386    /// Returns `true` if the range does not contain any elements (`Range::len() == 0`)
387    fn is_empty(&self) -> bool {
388        self.len() == 0
389    }
390}
391
392impl<'a, I> StreamOnce for &'a mut I
393where
394    I: StreamOnce + ?Sized,
395{
396    type Token = I::Token;
397
398    type Range = I::Range;
399
400    type Position = I::Position;
401
402    type Error = I::Error;
403    fn uncons(&mut self) -> Result<Self::Token, StreamErrorFor<Self>> {
404        (**self).uncons()
405    }
406
407    fn is_partial(&self) -> bool {
408        (**self).is_partial()
409    }
410}
411
412impl<'a, I> Positioned for &'a mut I
413where
414    I: Positioned + ?Sized,
415{
416    #[inline]
417    fn position(&self) -> Self::Position {
418        (**self).position()
419    }
420}
421
422impl<'a, I> ResetStream for &'a mut I
423where
424    I: ResetStream + ?Sized,
425{
426    type Checkpoint = I::Checkpoint;
427
428    fn checkpoint(&self) -> Self::Checkpoint {
429        (**self).checkpoint()
430    }
431
432    fn reset(&mut self, checkpoint: Self::Checkpoint) -> Result<(), Self::Error> {
433        (**self).reset(checkpoint)
434    }
435}
436
437impl<'a, I> RangeStreamOnce for &'a mut I
438where
439    I: RangeStreamOnce + ?Sized,
440{
441    #[inline]
442    fn uncons_while<F>(&mut self, f: F) -> Result<Self::Range, StreamErrorFor<Self>>
443    where
444        F: FnMut(Self::Token) -> bool,
445    {
446        (**self).uncons_while(f)
447    }
448
449    #[inline]
450    fn uncons_while1<F>(&mut self, f: F) -> ParseResult<Self::Range, StreamErrorFor<Self>>
451    where
452        F: FnMut(Self::Token) -> bool,
453    {
454        (**self).uncons_while1(f)
455    }
456
457    #[inline]
458    fn uncons_range(&mut self, size: usize) -> Result<Self::Range, StreamErrorFor<Self>> {
459        (**self).uncons_range(size)
460    }
461
462    #[inline]
463    fn distance(&self, end: &Self::Checkpoint) -> usize {
464        (**self).distance(end)
465    }
466
467    fn range(&self) -> Self::Range {
468        (**self).range()
469    }
470}
471
472impl<'a, I> Range for &'a mut I
473where
474    I: Range + ?Sized,
475{
476    fn len(&self) -> usize {
477        (**self).len()
478    }
479}
480
481impl<'a> StreamOnce for &'a str {
482    type Token = char;
483    type Range = &'a str;
484    type Position = PointerOffset<str>;
485    type Error = StringStreamError;
486
487    #[inline]
488    fn uncons(&mut self) -> Result<char, StreamErrorFor<Self>> {
489        let mut chars = self.chars();
490        match chars.next() {
491            Some(c) => {
492                *self = chars.as_str();
493                Ok(c)
494            }
495            None => Err(StringStreamError::Eoi),
496        }
497    }
498}
499
500impl<'a> Positioned for &'a str {
501    #[inline]
502    fn position(&self) -> Self::Position {
503        PointerOffset::new(self.as_bytes().position().0)
504    }
505}
506
507#[allow(clippy::while_let_loop)]
508fn str_uncons_while<'a, F>(slice: &mut &'a str, mut chars: Chars<'a>, mut f: F) -> &'a str
509where
510    F: FnMut(char) -> bool,
511{
512    let mut last_char_size = 0;
513
514    macro_rules! test_next {
515        () => {
516            match chars.next() {
517                Some(c) => {
518                    if !f(c) {
519                        last_char_size = c.len_utf8();
520                        break;
521                    }
522                }
523                None => break,
524            }
525        };
526    }
527    loop {
528        test_next!();
529        test_next!();
530        test_next!();
531        test_next!();
532        test_next!();
533        test_next!();
534        test_next!();
535        test_next!();
536    }
537
538    let len = slice.len() - chars.as_str().len() - last_char_size;
539    let (result, rest) = slice.split_at(len);
540    *slice = rest;
541    result
542}
543
544impl<'a> RangeStreamOnce for &'a str {
545    fn uncons_while<F>(&mut self, f: F) -> Result<&'a str, StreamErrorFor<Self>>
546    where
547        F: FnMut(Self::Token) -> bool,
548    {
549        Ok(str_uncons_while(self, self.chars(), f))
550    }
551
552    #[inline]
553    fn uncons_while1<F>(&mut self, mut f: F) -> ParseResult<Self::Range, StreamErrorFor<Self>>
554    where
555        F: FnMut(Self::Token) -> bool,
556    {
557        let mut chars = self.chars();
558        match chars.next() {
559            Some(c) => {
560                if !f(c) {
561                    return PeekErr(Tracked::from(StringStreamError::UnexpectedParse));
562                }
563            }
564            None => return PeekErr(Tracked::from(StringStreamError::Eoi)),
565        }
566
567        CommitOk(str_uncons_while(self, chars, f))
568    }
569
570    #[inline]
571    fn uncons_range(&mut self, size: usize) -> Result<&'a str, StreamErrorFor<Self>> {
572        fn is_char_boundary(s: &str, index: usize) -> bool {
573            if index == s.len() {
574                return true;
575            }
576            match s.as_bytes().get(index) {
577                None => false,
578                Some(b) => !(128..=192).contains(b),
579            }
580        }
581        if size <= self.len() {
582            if is_char_boundary(self, size) {
583                let (result, remaining) = self.split_at(size);
584                *self = remaining;
585                Ok(result)
586            } else {
587                Err(StringStreamError::CharacterBoundary)
588            }
589        } else {
590            Err(StringStreamError::Eoi)
591        }
592    }
593
594    #[inline]
595    fn distance(&self, end: &Self) -> usize {
596        self.position().0 - end.position().0
597    }
598
599    fn range(&self) -> Self::Range {
600        self
601    }
602}
603
604impl<'a> Range for &'a str {
605    #[inline]
606    fn len(&self) -> usize {
607        str::len(self)
608    }
609}
610
611impl<'a, T> Range for &'a [T] {
612    #[inline]
613    fn len(&self) -> usize {
614        <[T]>::len(self)
615    }
616}
617
618#[repr(usize)]
619enum UnconsStart {
620    Zero = 0,
621    One = 1,
622}
623
624fn slice_uncons_while<'a, T, F>(slice: &mut &'a [T], start: UnconsStart, mut f: F) -> &'a [T]
625where
626    F: FnMut(T) -> bool,
627    T: Clone,
628{
629    let mut i = start as usize;
630    let len = slice.len();
631    // SAFETY: We only call this function with `One` if the slice has length >= 1
632    debug_assert!(len >= i, "");
633    let mut found = false;
634
635    macro_rules! check {
636        () => {
637            if !f(unsafe { slice.get_unchecked(i).clone() }) {
638                found = true;
639                break;
640            }
641            i += 1;
642        };
643    }
644
645    // SAFETY: ensures we can access at least 8 elements starting at i, making get_unchecked sound.
646    while len - i >= 8 {
647        check!();
648        check!();
649        check!();
650        check!();
651        check!();
652        check!();
653        check!();
654        check!();
655    }
656
657    if !found {
658        while let Some(c) = slice.get(i) {
659            if !f(c.clone()) {
660                break;
661            }
662            i += 1;
663        }
664    }
665
666    let (result, remaining) = slice.split_at(i);
667    *slice = remaining;
668    result
669}
670
671impl<'a, T> RangeStreamOnce for &'a [T]
672where
673    T: Clone + PartialEq,
674{
675    #[inline]
676    fn uncons_range(&mut self, size: usize) -> Result<&'a [T], StreamErrorFor<Self>> {
677        if size <= self.len() {
678            let (result, remaining) = self.split_at(size);
679            *self = remaining;
680            Ok(result)
681        } else {
682            Err(UnexpectedParse::Eoi)
683        }
684    }
685
686    #[inline]
687    fn uncons_while<F>(&mut self, f: F) -> Result<&'a [T], StreamErrorFor<Self>>
688    where
689        F: FnMut(Self::Token) -> bool,
690    {
691        Ok(slice_uncons_while(self, UnconsStart::Zero, f))
692    }
693
694    #[inline]
695    fn uncons_while1<F>(&mut self, mut f: F) -> ParseResult<Self::Range, StreamErrorFor<Self>>
696    where
697        F: FnMut(Self::Token) -> bool,
698    {
699        match self.first() {
700            Some(c) => {
701                if !f(c.clone()) {
702                    return PeekErr(Tracked::from(UnexpectedParse::Unexpected));
703                }
704            }
705            None => {
706                return PeekErr(Tracked::from(UnexpectedParse::Eoi));
707            }
708        }
709
710        CommitOk(slice_uncons_while(self, UnconsStart::One, f))
711    }
712
713    #[inline]
714    fn distance(&self, end: &Self) -> usize {
715        end.len() - self.len()
716    }
717
718    fn range(&self) -> Self::Range {
719        self
720    }
721}
722
723impl<'a, T> Positioned for &'a [T]
724where
725    T: Clone + PartialEq,
726{
727    #[inline]
728    fn position(&self) -> Self::Position {
729        PointerOffset::new(self.as_ptr() as usize)
730    }
731}
732
733impl<'a, T> StreamOnce for &'a [T]
734where
735    T: Clone + PartialEq,
736{
737    type Token = T;
738    type Range = &'a [T];
739    type Position = PointerOffset<[T]>;
740    type Error = UnexpectedParse;
741
742    #[inline]
743    fn uncons(&mut self) -> Result<T, StreamErrorFor<Self>> {
744        match self.split_first() {
745            Some((first, rest)) => {
746                *self = rest;
747                Ok(first.clone())
748            }
749            None => Err(UnexpectedParse::Eoi),
750        }
751    }
752}
753
754/// Stream type which indicates that the stream is partial if end of input is reached
755#[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Debug)]
756pub struct PartialStream<S>(pub S);
757
758impl<S> From<S> for PartialStream<S> {
759    fn from(t: S) -> Self {
760        PartialStream(t)
761    }
762}
763
764impl<S> Positioned for PartialStream<S>
765where
766    S: Positioned,
767{
768    #[inline]
769    fn position(&self) -> Self::Position {
770        self.0.position()
771    }
772}
773
774impl<S> ResetStream for PartialStream<S>
775where
776    S: ResetStream,
777{
778    type Checkpoint = S::Checkpoint;
779
780    #[inline]
781    fn checkpoint(&self) -> Self::Checkpoint {
782        self.0.checkpoint()
783    }
784
785    #[inline]
786    fn reset(&mut self, checkpoint: Self::Checkpoint) -> Result<(), S::Error> {
787        self.0.reset(checkpoint)
788    }
789}
790
791impl<S> StreamOnce for PartialStream<S>
792where
793    S: StreamOnce,
794{
795    type Token = S::Token;
796    type Range = S::Range;
797    type Position = S::Position;
798    type Error = S::Error;
799
800    #[inline]
801    fn uncons(&mut self) -> Result<S::Token, StreamErrorFor<Self>> {
802        self.0.uncons()
803    }
804
805    fn is_partial(&self) -> bool {
806        true
807    }
808}
809
810impl<S> RangeStreamOnce for PartialStream<S>
811where
812    S: RangeStreamOnce,
813{
814    #[inline]
815    fn uncons_range(&mut self, size: usize) -> Result<Self::Range, StreamErrorFor<Self>> {
816        self.0.uncons_range(size)
817    }
818
819    #[inline]
820    fn uncons_while<F>(&mut self, f: F) -> Result<Self::Range, StreamErrorFor<Self>>
821    where
822        F: FnMut(Self::Token) -> bool,
823    {
824        self.0.uncons_while(f)
825    }
826
827    fn uncons_while1<F>(&mut self, f: F) -> ParseResult<Self::Range, StreamErrorFor<Self>>
828    where
829        F: FnMut(Self::Token) -> bool,
830    {
831        self.0.uncons_while1(f)
832    }
833
834    #[inline]
835    fn distance(&self, end: &Self::Checkpoint) -> usize {
836        self.0.distance(end)
837    }
838
839    #[inline]
840    fn range(&self) -> Self::Range {
841        self.0.range()
842    }
843}
844
845/// Stream type which indicates that the stream is complete if end of input is reached
846///
847/// For most streams this is already the default but this wrapper can be used to override a nested
848/// `PartialStream`
849#[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Debug)]
850#[repr(transparent)]
851pub struct CompleteStream<S>(pub S);
852
853impl<S> From<S> for CompleteStream<S> {
854    fn from(t: S) -> Self {
855        CompleteStream(t)
856    }
857}
858
859impl<'s, S> From<&'s mut S> for &'s mut CompleteStream<S> {
860    fn from(t: &'s mut S) -> Self {
861        // SAFETY repr(transparent) is specified on CompleteStream
862        unsafe { &mut *(t as *mut S as *mut CompleteStream<S>) }
863    }
864}
865
866impl<S> Positioned for CompleteStream<S>
867where
868    S: Positioned,
869{
870    #[inline]
871    fn position(&self) -> Self::Position {
872        self.0.position()
873    }
874}
875
876impl<S> ResetStream for CompleteStream<S>
877where
878    S: ResetStream,
879{
880    type Checkpoint = S::Checkpoint;
881
882    #[inline]
883    fn checkpoint(&self) -> Self::Checkpoint {
884        self.0.checkpoint()
885    }
886
887    #[inline]
888    fn reset(&mut self, checkpoint: Self::Checkpoint) -> Result<(), S::Error> {
889        self.0.reset(checkpoint)
890    }
891}
892
893impl<S> StreamOnce for CompleteStream<S>
894where
895    S: StreamOnce,
896{
897    type Token = S::Token;
898    type Range = S::Range;
899    type Position = S::Position;
900    type Error = S::Error;
901
902    #[inline]
903    fn uncons(&mut self) -> Result<S::Token, StreamErrorFor<Self>> {
904        self.0.uncons()
905    }
906
907    fn is_partial(&self) -> bool {
908        false
909    }
910}
911
912impl<S> RangeStreamOnce for CompleteStream<S>
913where
914    S: RangeStreamOnce,
915{
916    #[inline]
917    fn uncons_range(&mut self, size: usize) -> Result<Self::Range, StreamErrorFor<Self>> {
918        self.0.uncons_range(size)
919    }
920
921    #[inline]
922    fn uncons_while<F>(&mut self, f: F) -> Result<Self::Range, StreamErrorFor<Self>>
923    where
924        F: FnMut(Self::Token) -> bool,
925    {
926        self.0.uncons_while(f)
927    }
928
929    fn uncons_while1<F>(&mut self, f: F) -> ParseResult<Self::Range, StreamErrorFor<Self>>
930    where
931        F: FnMut(Self::Token) -> bool,
932    {
933        self.0.uncons_while1(f)
934    }
935
936    #[inline]
937    fn distance(&self, end: &Self::Checkpoint) -> usize {
938        self.0.distance(end)
939    }
940
941    #[inline]
942    fn range(&self) -> Self::Range {
943        self.0.range()
944    }
945}
946
947#[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Debug)]
948pub struct MaybePartialStream<S>(pub S, pub bool);
949
950impl<S> Positioned for MaybePartialStream<S>
951where
952    S: Positioned,
953{
954    #[inline]
955    fn position(&self) -> Self::Position {
956        self.0.position()
957    }
958}
959
960impl<S> ResetStream for MaybePartialStream<S>
961where
962    S: ResetStream,
963{
964    type Checkpoint = S::Checkpoint;
965
966    #[inline]
967    fn checkpoint(&self) -> Self::Checkpoint {
968        self.0.checkpoint()
969    }
970
971    #[inline]
972    fn reset(&mut self, checkpoint: Self::Checkpoint) -> Result<(), S::Error> {
973        self.0.reset(checkpoint)
974    }
975}
976
977impl<S> StreamOnce for MaybePartialStream<S>
978where
979    S: StreamOnce,
980{
981    type Token = S::Token;
982    type Range = S::Range;
983    type Position = S::Position;
984    type Error = S::Error;
985
986    #[inline]
987    fn uncons(&mut self) -> Result<S::Token, StreamErrorFor<Self>> {
988        self.0.uncons()
989    }
990
991    fn is_partial(&self) -> bool {
992        self.1
993    }
994}
995
996impl<S> RangeStreamOnce for MaybePartialStream<S>
997where
998    S: RangeStreamOnce,
999{
1000    #[inline]
1001    fn uncons_range(&mut self, size: usize) -> Result<Self::Range, StreamErrorFor<Self>> {
1002        self.0.uncons_range(size)
1003    }
1004
1005    #[inline]
1006    fn uncons_while<F>(&mut self, f: F) -> Result<Self::Range, StreamErrorFor<Self>>
1007    where
1008        F: FnMut(Self::Token) -> bool,
1009    {
1010        self.0.uncons_while(f)
1011    }
1012
1013    fn uncons_while1<F>(&mut self, f: F) -> ParseResult<Self::Range, StreamErrorFor<Self>>
1014    where
1015        F: FnMut(Self::Token) -> bool,
1016    {
1017        self.0.uncons_while1(f)
1018    }
1019
1020    #[inline]
1021    fn distance(&self, end: &Self::Checkpoint) -> usize {
1022        self.0.distance(end)
1023    }
1024
1025    #[inline]
1026    fn range(&self) -> Self::Range {
1027        self.0.range()
1028    }
1029}
1030
1031/// Newtype for constructing a stream from a slice where the items in the slice are not copyable.
1032#[derive(Copy, Eq, PartialEq, Ord, PartialOrd, Debug)]
1033pub struct SliceStream<'a, T>(pub &'a [T]);
1034
1035impl<'a, T> Clone for SliceStream<'a, T> {
1036    fn clone(&self) -> SliceStream<'a, T> {
1037        SliceStream(self.0)
1038    }
1039}
1040
1041impl<'a, T> Positioned for SliceStream<'a, T>
1042where
1043    T: PartialEq + 'a,
1044{
1045    #[inline]
1046    fn position(&self) -> Self::Position {
1047        PointerOffset::new(self.0.as_ptr() as usize)
1048    }
1049}
1050
1051impl<'a, T> StreamOnce for SliceStream<'a, T>
1052where
1053    T: PartialEq + 'a,
1054{
1055    type Token = &'a T;
1056    type Range = &'a [T];
1057    type Position = PointerOffset<[T]>;
1058    type Error = UnexpectedParse;
1059
1060    #[inline]
1061    fn uncons(&mut self) -> Result<&'a T, StreamErrorFor<Self>> {
1062        match self.0.split_first() {
1063            Some((first, rest)) => {
1064                self.0 = rest;
1065                Ok(first)
1066            }
1067            None => Err(UnexpectedParse::Eoi),
1068        }
1069    }
1070}
1071
1072fn slice_uncons_while_ref<'a, T, F>(slice: &mut &'a [T], start: UnconsStart, mut f: F) -> &'a [T]
1073where
1074    F: FnMut(&'a T) -> bool,
1075{
1076    let mut i = start as usize;
1077    let len = slice.len();
1078    // SAFETY: We only call this function with `One` if the slice has length >= 1
1079    debug_assert!(len >= i, "");
1080    let mut found = false;
1081
1082    macro_rules! check {
1083        () => {
1084            if !f(unsafe { slice.get_unchecked(i) }) {
1085                found = true;
1086                break;
1087            }
1088            i += 1;
1089        };
1090    }
1091
1092    // SAFETY: ensures we can access at least 8 elements starting at i, making get_unchecked sound.
1093    while len - i >= 8 {
1094        check!();
1095        check!();
1096        check!();
1097        check!();
1098        check!();
1099        check!();
1100        check!();
1101        check!();
1102    }
1103
1104    if !found {
1105        while let Some(c) = slice.get(i) {
1106            if !f(c) {
1107                break;
1108            }
1109            i += 1;
1110        }
1111    }
1112
1113    let (result, remaining) = slice.split_at(i);
1114    *slice = remaining;
1115    result
1116}
1117
1118impl<'a, T> RangeStreamOnce for SliceStream<'a, T>
1119where
1120    T: PartialEq + 'a,
1121{
1122    #[inline]
1123    fn uncons_range(&mut self, size: usize) -> Result<&'a [T], StreamErrorFor<Self>> {
1124        if size <= self.0.len() {
1125            let (range, rest) = self.0.split_at(size);
1126            self.0 = rest;
1127            Ok(range)
1128        } else {
1129            Err(UnexpectedParse::Eoi)
1130        }
1131    }
1132
1133    #[inline]
1134    fn uncons_while<F>(&mut self, f: F) -> Result<&'a [T], StreamErrorFor<Self>>
1135    where
1136        F: FnMut(Self::Token) -> bool,
1137    {
1138        Ok(slice_uncons_while_ref(&mut self.0, UnconsStart::Zero, f))
1139    }
1140
1141    #[inline]
1142    fn uncons_while1<F>(&mut self, mut f: F) -> ParseResult<Self::Range, StreamErrorFor<Self>>
1143    where
1144        F: FnMut(Self::Token) -> bool,
1145    {
1146        match self.0.first() {
1147            Some(c) => {
1148                if !f(c) {
1149                    return PeekErr(Tracked::from(UnexpectedParse::Unexpected));
1150                }
1151            }
1152            None => return PeekErr(Tracked::from(UnexpectedParse::Eoi)),
1153        }
1154
1155        CommitOk(slice_uncons_while_ref(&mut self.0, UnconsStart::One, f))
1156    }
1157
1158    #[inline]
1159    fn distance(&self, end: &Self) -> usize {
1160        end.0.len() - self.0.len()
1161    }
1162
1163    fn range(&self) -> Self::Range {
1164        self.0
1165    }
1166}
1167
1168/// Wrapper around iterators which allows them to be treated as a stream.
1169/// Returned by [`IteratorStream::new`].
1170#[derive(Copy, Clone, Debug)]
1171pub struct IteratorStream<Input>(Input);
1172
1173impl<Input> IteratorStream<Input>
1174where
1175    Input: Iterator,
1176{
1177    /// Converts an `Iterator` into a stream.
1178    ///
1179    /// NOTE: This type do not implement `Positioned` and `Clone` and must be wrapped with types
1180    ///     such as `BufferedStreamRef` and `State` to become a `Stream` which can be parsed
1181    pub fn new<T>(iter: T) -> IteratorStream<Input>
1182    where
1183        T: IntoIterator<IntoIter = Input, Item = Input::Item>,
1184    {
1185        IteratorStream(iter.into_iter())
1186    }
1187}
1188
1189impl<Input> Iterator for IteratorStream<Input>
1190where
1191    Input: Iterator,
1192{
1193    type Item = Input::Item;
1194    fn next(&mut self) -> Option<Input::Item> {
1195        self.0.next()
1196    }
1197}
1198
1199impl<Input: Iterator> StreamOnce for IteratorStream<Input>
1200where
1201    Input::Item: Clone + PartialEq,
1202{
1203    type Token = Input::Item;
1204    type Range = Input::Item;
1205    type Position = ();
1206    type Error = UnexpectedParse;
1207
1208    #[inline]
1209    fn uncons(&mut self) -> Result<Self::Token, StreamErrorFor<Self>> {
1210        match self.next() {
1211            Some(x) => Ok(x),
1212            None => Err(UnexpectedParse::Eoi),
1213        }
1214    }
1215}
1216
1217/// Newtype around a pointer offset into a slice stream (`&[T]`/`&str`).
1218pub struct PointerOffset<T: ?Sized>(pub usize, PhantomData<T>);
1219
1220impl<T: ?Sized> Clone for PointerOffset<T> {
1221    fn clone(&self) -> Self {
1222        PointerOffset::new(self.0)
1223    }
1224}
1225
1226impl<T: ?Sized> Copy for PointerOffset<T> {}
1227
1228impl<T: ?Sized> Default for PointerOffset<T> {
1229    fn default() -> Self {
1230        PointerOffset::new(0)
1231    }
1232}
1233
1234impl<T: ?Sized> PartialEq for PointerOffset<T> {
1235    fn eq(&self, other: &Self) -> bool {
1236        self.0 == other.0
1237    }
1238}
1239
1240impl<T: ?Sized> Eq for PointerOffset<T> {}
1241
1242impl<T: ?Sized> PartialOrd for PointerOffset<T> {
1243    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1244        self.0.partial_cmp(&other.0)
1245    }
1246}
1247
1248impl<T: ?Sized> Ord for PointerOffset<T> {
1249    fn cmp(&self, other: &Self) -> Ordering {
1250        self.0.cmp(&other.0)
1251    }
1252}
1253
1254impl<T> fmt::Debug for PointerOffset<T>
1255where
1256    T: ?Sized,
1257{
1258    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1259        write!(f, "{}", self)
1260    }
1261}
1262
1263impl<T> fmt::Display for PointerOffset<T>
1264where
1265    T: ?Sized,
1266{
1267    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1268        write!(f, "PointerOffset({:?})", self.0 as *const ())
1269    }
1270}
1271
1272impl<T> PointerOffset<T>
1273where
1274    T: ?Sized,
1275{
1276    pub fn new(offset: usize) -> Self {
1277        PointerOffset(offset, PhantomData)
1278    }
1279
1280    /// Converts the pointer-based position into an indexed position.
1281    ///
1282    /// ```rust
1283    /// # extern crate combine;
1284    /// # use combine::*;
1285    /// # fn main() {
1286    /// let text = "b";
1287    /// let err = token('a').easy_parse(text).unwrap_err();
1288    /// assert_eq!(err.position.0, text.as_ptr() as usize);
1289    /// assert_eq!(err.map_position(|p| p.translate_position(text)).position, 0);
1290    /// # }
1291    /// ```
1292    pub fn translate_position(mut self, initial_slice: &T) -> usize {
1293        self.0 -= initial_slice as *const T as *const () as usize;
1294        self.0
1295    }
1296}
1297
1298/// Decodes `input` using `parser`.
1299///
1300/// Return `Ok(Some(token), committed_data)` if there was enough data to finish parsing using
1301/// `parser`.
1302/// Returns `Ok(None, committed_data)` if `input` did not contain enough data to finish parsing
1303/// using `parser`.
1304///
1305/// See `examples/async.rs` for example usage in a `tokio_io::codec::Decoder`
1306pub fn decode<Input, P>(
1307    mut parser: P,
1308    input: &mut Input,
1309    partial_state: &mut P::PartialState,
1310) -> Result<(Option<P::Output>, usize), <Input as StreamOnce>::Error>
1311where
1312    P: Parser<Input>,
1313    Input: RangeStream,
1314{
1315    let start = input.checkpoint();
1316    match parser.parse_with_state(input, partial_state) {
1317        Ok(message) => Ok((Some(message), input.distance(&start))),
1318        Err(err) => {
1319            if err.is_unexpected_end_of_input() {
1320                if input.is_partial() {
1321                    // The parser expected more input to parse and input is partial, return `None`
1322                    // as we did not finish and also return how much may be removed from the stream
1323                    Ok((None, input.distance(&start)))
1324                } else {
1325                    Err(err)
1326                }
1327            } else {
1328                Err(err)
1329            }
1330        }
1331    }
1332}
1333
1334/// Decodes `input` using `parser`. Like `decode` but works directly in both
1335/// `tokio_util::Decoder::decode` and `tokio_util::Decoder::decode_eof`
1336///
1337/// Return `Ok(Some(token), committed_data)` if there was enough data to finish parsing using
1338/// `parser`.
1339/// Returns `Ok(None, committed_data)` if `input` did not contain enough data to finish parsing
1340/// using `parser`.
1341/// Returns `Ok(None, 0)` if `input` did not contain enough data to finish parsing
1342/// using `parser`.
1343///
1344/// See `examples/async.rs` for example usage in a `tokio_io::codec::Decoder`
1345pub fn decode_tokio<Input, P>(
1346    mut parser: P,
1347    input: &mut Input,
1348    partial_state: &mut P::PartialState,
1349) -> Result<(Option<P::Output>, usize), <Input as StreamOnce>::Error>
1350where
1351    P: Parser<Input>,
1352    Input: RangeStream,
1353{
1354    let start = input.checkpoint();
1355    match parser.parse_with_state(input, partial_state) {
1356        Ok(message) => Ok((Some(message), input.distance(&start))),
1357        Err(err) => {
1358            if err.is_unexpected_end_of_input() {
1359                if input.is_partial() {
1360                    // The parser expected more input to parse and input is partial, return `None`
1361                    // as we did not finish and also return how much may be removed from the stream
1362                    Ok((None, input.distance(&start)))
1363                } else if input_at_eof(input) && input.distance(&start) == 0 {
1364                    // We are at eof and the input is empty, return None to indicate that we are
1365                    // done
1366                    Ok((None, 0))
1367                } else {
1368                    Err(err)
1369                }
1370            } else {
1371                Err(err)
1372            }
1373        }
1374    }
1375}
1376
1377/// Parses an instance of `std::io::Read` as a `&[u8]` without reading the entire file into
1378/// memory.
1379///
1380/// This is defined as a macro to work around the lack of Higher Ranked Types. See the
1381/// example for how to pass a parser to the macro (constructing parts of the parser outside of
1382/// the `decode!` call is unlikely to work.
1383///
1384/// ```
1385/// use std::{
1386///     fs::File,
1387/// };
1388/// use combine::{decode, satisfy, skip_many1, many1, sep_end_by, Parser, stream::Decoder};
1389///
1390/// let mut read = File::open("README.md").unwrap();
1391/// let mut decoder = Decoder::new();
1392/// let is_whitespace = |b: u8| b == b' ' || b == b'\r' || b == b'\n';
1393/// assert_eq!(
1394///     decode!(
1395///         decoder,
1396///         read,
1397///         {
1398///             let word = many1(satisfy(|b| !is_whitespace(b)));
1399///             sep_end_by(word, skip_many1(satisfy(is_whitespace))).map(|words: Vec<Vec<u8>>| words.len())
1400///         },
1401///         |input, _position| combine::easy::Stream::from(input),
1402///     ).map_err(combine::easy::Errors::<u8, &[u8], _>::from),
1403///     Ok(773),
1404/// );
1405/// ```
1406#[cfg(feature = "std")]
1407#[cfg_attr(docsrs, doc(cfg(feature = "std")))]
1408#[macro_export]
1409macro_rules! decode {
1410    ($decoder: expr, $read: expr, $parser: expr $(,)?) => {
1411        $crate::decode!($decoder, $read, $parser, |input, _position| input, |x| x)
1412    };
1413
1414    ($decoder: expr, $read: expr, $parser: expr, $input_stream: expr $(,)?) => {
1415        $crate::decode!($decoder, $read, $parser, $input_stream, |x| x)
1416    };
1417
1418    ($decoder: expr, $read: expr, $parser: expr, $input_stream: expr, $post_decode: expr $(,)?) => {
1419        match $decoder {
1420            ref mut decoder => match $read {
1421                ref mut read => 'outer: loop {
1422                    let (opt, removed) = {
1423                        let (state, position, buffer, end_of_input) = decoder.__inner();
1424                        let buffer =
1425                            $crate::stream::buf_reader::CombineBuffer::buffer(buffer, read);
1426
1427                        let mut stream = $crate::stream::call_with2(
1428                            $crate::stream::MaybePartialStream(buffer, !end_of_input),
1429                            *position,
1430                            $input_stream,
1431                        );
1432                        let result = $crate::stream::decode($parser, &mut stream, state);
1433                        *position = $crate::stream::Positioned::position(&stream);
1434                        $crate::stream::call_with(stream, $post_decode);
1435                        match result {
1436                            Ok(x) => x,
1437                            Err(err) => {
1438                                break 'outer Err($crate::stream::decoder::Error::Parse(err))
1439                            }
1440                        }
1441                    };
1442
1443                    decoder.advance(&mut *read, removed);
1444
1445                    if let Some(v) = opt {
1446                        break 'outer Ok(v);
1447                    }
1448
1449                    match decoder.__before_parse(&mut *read) {
1450                        Ok(x) => x,
1451                        Err(error) => {
1452                            break 'outer Err($crate::stream::decoder::Error::Io {
1453                                error,
1454                                position: Clone::clone(decoder.position()),
1455                            })
1456                        }
1457                    };
1458                },
1459            },
1460        }
1461    };
1462}
1463
1464/// Parses an instance of `futures::io::AsyncRead` as a `&[u8]` without reading the entire file into
1465/// memory.
1466///
1467/// This is defined as a macro to work around the lack of Higher Ranked Types. See the
1468/// example for how to pass a parser to the macro (constructing parts of the parser outside of
1469/// the `decode!` call is unlikely to work.
1470///
1471/// ```
1472/// # use futures_03_dep as futures;
1473/// use futures::pin_mut;
1474/// use async_std::{
1475///     fs::File,
1476///     task,
1477/// };
1478///
1479/// use combine::{decode_futures_03, satisfy, skip_many1, many1, sep_end_by, Parser, stream::Decoder};
1480///
1481/// fn main() {
1482///     task::block_on(main_());
1483/// }
1484///
1485/// async fn main_() {
1486///     let mut read = File::open("README.md").await.unwrap();
1487///     let mut decoder = Decoder::new();
1488///     let is_whitespace = |b: u8| b == b' ' || b == b'\r' || b == b'\n';
1489///     assert_eq!(
1490///         decode_futures_03!(
1491///             decoder,
1492///             read,
1493///             {
1494///                 let word = many1(satisfy(|b| !is_whitespace(b)));
1495///                 sep_end_by(word, skip_many1(satisfy(is_whitespace))).map(|words: Vec<Vec<u8>>| words.len())
1496///             },
1497///             |input, _position| combine::easy::Stream::from(input),
1498///         ).map_err(combine::easy::Errors::<u8, &[u8], _>::from),
1499///         Ok(773),
1500///     );
1501/// }
1502/// ```
1503#[cfg(feature = "futures-io-03")]
1504#[cfg_attr(docsrs, doc(cfg(feature = "futures-io-03")))]
1505#[macro_export]
1506macro_rules! decode_futures_03 {
1507    ($decoder: expr, $read: expr, $parser: expr) => {
1508        $crate::decode_futures_03!($decoder, $read, $parser, |x| x $(,)?)
1509    };
1510
1511
1512    ($decoder: expr, $read: expr, $parser: expr, $input_stream: expr $(,)?) => {
1513        $crate::decode_futures_03!($decoder, $read, $parser, $input_stream, |x| x)
1514    };
1515
1516    ($decoder: expr, $read: expr, $parser: expr, $input_stream: expr, $post_decode: expr $(,)?) => {
1517        match $decoder {
1518            ref mut decoder => match $read {
1519                ref mut read => 'outer: loop {
1520                    let (opt, removed) = {
1521                        let (state, position, buffer, end_of_input) = decoder.__inner();
1522                        let buffer =
1523                            $crate::stream::buf_reader::CombineBuffer::buffer(buffer, &*read);
1524
1525                        let mut stream = $crate::stream::call_with2(
1526                            $crate::stream::MaybePartialStream(buffer, !end_of_input),
1527                            *position,
1528                            $input_stream,
1529                        );
1530                        let result = $crate::stream::decode($parser, &mut stream, state);
1531                        *position = $crate::stream::Positioned::position(&stream);
1532                        $crate::stream::call_with(stream, $post_decode);
1533                        match result {
1534                            Ok(x) => x,
1535                            Err(err) => break 'outer Err($crate::stream::decoder::Error::Parse(err)),
1536                        }
1537                    };
1538
1539                    decoder.advance_pin(std::pin::Pin::new(&mut *read), removed);
1540
1541                    if let Some(v) = opt {
1542                        break 'outer Ok(v);
1543                    }
1544
1545
1546                    match decoder.__before_parse_async(std::pin::Pin::new(&mut *read)).await {
1547                        Ok(_) => (),
1548                        Err(error) => {
1549                            break 'outer Err($crate::stream::decoder::Error::Io {
1550                                error,
1551                                position: Clone::clone(decoder.position()),
1552                            })
1553                        }
1554                    };
1555                }
1556            }
1557        }
1558    };
1559}
1560
1561/// Parses an instance of `tokio::io::AsyncRead` as a `&[u8]` without reading the entire file into
1562/// memory.
1563///
1564/// This is defined as a macro to work around the lack of Higher Ranked Types. See the
1565/// example for how to pass a parser to the macro (constructing parts of the parser outside of
1566/// the `decode!` call is unlikely to work.
1567///
1568/// ```
1569/// # use tokio_02_dep as tokio;
1570/// # use futures_03_dep as futures;
1571/// use futures::pin_mut;
1572/// use tokio::{
1573///     fs::File,
1574/// };
1575///
1576/// use combine::{decode_tokio_02, satisfy, skip_many1, many1, sep_end_by, Parser, stream::{Decoder, buf_reader::BufReader}};
1577///
1578/// #[tokio::main]
1579/// async fn main() {
1580///     let mut read = BufReader::new(File::open("README.md").await.unwrap());
1581///     let mut decoder = Decoder::new_bufferless();
1582///     let is_whitespace = |b: u8| b == b' ' || b == b'\r' || b == b'\n';
1583///     assert_eq!(
1584///         decode_tokio_02!(
1585///             decoder,
1586///             read,
1587///             {
1588///                 let word = many1(satisfy(|b| !is_whitespace(b)));
1589///                 sep_end_by(word, skip_many1(satisfy(is_whitespace))).map(|words: Vec<Vec<u8>>| words.len())
1590///             },
1591///             |input, _position| combine::easy::Stream::from(input),
1592///         ).map_err(combine::easy::Errors::<u8, &[u8], _>::from),
1593///         Ok(773),
1594///     );
1595/// }
1596/// ```
1597#[cfg(feature = "tokio-02")]
1598#[cfg_attr(docsrs, doc(cfg(feature = "tokio-02")))]
1599#[macro_export]
1600macro_rules! decode_tokio_02 {
1601    ($decoder: expr, $read: expr, $parser: expr $(,)?) => {
1602        $crate::decode_tokio_02!($decoder, $read, $parser, |input, _position| input)
1603    };
1604
1605    ($decoder: expr, $read: expr, $parser: expr, $input_stream: expr $(,)?) => {
1606        $crate::decode_tokio_02!($decoder, $read, $parser, $input_stream, |x| x)
1607    };
1608
1609    ($decoder: expr, $read: expr, $parser: expr, $input_stream: expr, $post_decode: expr $(,)?) => {
1610        match $decoder {
1611            ref mut decoder => match $read {
1612                ref mut read => 'outer: loop {
1613                    let (opt, removed) = {
1614                        let (state, position, buffer, end_of_input) = decoder.__inner();
1615                        let buffer =
1616                            $crate::stream::buf_reader::CombineBuffer::buffer(buffer, &*read);
1617                        let mut stream = $crate::stream::call_with2(
1618                            $crate::stream::MaybePartialStream(buffer, !end_of_input),
1619                            *position,
1620                            $input_stream,
1621                        );
1622                        let result = $crate::stream::decode($parser, &mut stream, state);
1623                        *position = $crate::stream::Positioned::position(&stream);
1624                        $crate::stream::call_with(stream, $post_decode);
1625                        match result {
1626                            Ok(x) => x,
1627                            Err(err) => {
1628                                break 'outer Err($crate::stream::decoder::Error::Parse(err))
1629                            }
1630                        }
1631                    };
1632
1633                    decoder.advance_pin(std::pin::Pin::new(read), removed);
1634
1635                    if let Some(v) = opt {
1636                        break 'outer Ok(v);
1637                    }
1638
1639                    match decoder
1640                        .__before_parse_tokio_02(std::pin::Pin::new(&mut *read))
1641                        .await
1642                    {
1643                        Ok(x) => x,
1644                        Err(error) => {
1645                            break 'outer Err($crate::stream::decoder::Error::Io {
1646                                error,
1647                                position: Clone::clone(decoder.position()),
1648                            })
1649                        }
1650                    };
1651                },
1652            },
1653        }
1654    };
1655}
1656
1657/// Parses an instance of `tokio::io::AsyncRead` as a `&[u8]` without reading the entire file into
1658/// memory.
1659///
1660/// This is defined as a macro to work around the lack of Higher Ranked Types. See the
1661/// example for how to pass a parser to the macro (constructing parts of the parser outside of
1662/// the `decode!` call is unlikely to work.
1663///
1664/// ```
1665/// # use tokio_03_dep as tokio;
1666/// # use futures_03_dep as futures;
1667/// use futures::pin_mut;
1668/// use tokio::{
1669///     fs::File,
1670/// };
1671///
1672/// use combine::{decode_tokio_03, satisfy, skip_many1, many1, sep_end_by, Parser, stream::{Decoder, buf_reader::BufReader}};
1673///
1674/// #[tokio::main]
1675/// async fn main() {
1676///     let mut read = BufReader::new(File::open("README.md").await.unwrap());
1677///     let mut decoder = Decoder::new_bufferless();
1678///     let is_whitespace = |b: u8| b == b' ' || b == b'\r' || b == b'\n';
1679///     assert_eq!(
1680///         decode_tokio_03!(
1681///             decoder,
1682///             read,
1683///             {
1684///                 let word = many1(satisfy(|b| !is_whitespace(b)));
1685///                 sep_end_by(word, skip_many1(satisfy(is_whitespace))).map(|words: Vec<Vec<u8>>| words.len())
1686///             },
1687///             |input, _position| combine::easy::Stream::from(input),
1688///         ).map_err(combine::easy::Errors::<u8, &[u8], _>::from),
1689///         Ok(773),
1690///     );
1691/// }
1692/// ```
1693#[cfg(feature = "tokio-03")]
1694#[cfg_attr(docsrs, doc(cfg(feature = "tokio-03")))]
1695#[macro_export]
1696macro_rules! decode_tokio_03 {
1697    ($decoder: expr, $read: expr, $parser: expr $(,)?) => {
1698        $crate::decode_tokio_03!($decoder, $read, $parser, |input, _position| input)
1699    };
1700
1701    ($decoder: expr, $read: expr, $parser: expr, $input_stream: expr $(,)?) => {
1702        $crate::decode_tokio_03!($decoder, $read, $parser, $input_stream, |x| x)
1703    };
1704
1705    ($decoder: expr, $read: expr, $parser: expr, $input_stream: expr, $post_decode: expr $(,)?) => {
1706        match $decoder {
1707            ref mut decoder => match $read {
1708                ref mut read => 'outer: loop {
1709                    let (opt, removed) = {
1710                        let (state, position, buffer, end_of_input) = decoder.__inner();
1711                        let buffer =
1712                            $crate::stream::buf_reader::CombineBuffer::buffer(buffer, &*read);
1713                        let mut stream = $crate::stream::call_with2(
1714                            $crate::stream::MaybePartialStream(buffer, !end_of_input),
1715                            *position,
1716                            $input_stream,
1717                        );
1718                        let result = $crate::stream::decode($parser, &mut stream, state);
1719                        *position = $crate::stream::Positioned::position(&stream);
1720                        $crate::stream::call_with(stream, $post_decode);
1721                        match result {
1722                            Ok(x) => x,
1723                            Err(err) => {
1724                                break 'outer Err($crate::stream::decoder::Error::Parse(err))
1725                            }
1726                        }
1727                    };
1728
1729                    decoder.advance_pin(std::pin::Pin::new(read), removed);
1730
1731                    if let Some(v) = opt {
1732                        break 'outer Ok(v);
1733                    }
1734
1735                    match decoder
1736                        .__before_parse_tokio_03(std::pin::Pin::new(&mut *read))
1737                        .await
1738                    {
1739                        Ok(x) => x,
1740                        Err(error) => {
1741                            break 'outer Err($crate::stream::decoder::Error::Io {
1742                                error,
1743                                position: Clone::clone(decoder.position()),
1744                            })
1745                        }
1746                    };
1747                },
1748            },
1749        }
1750    };
1751}
1752
1753/// Parses an instance of `tokio::io::AsyncRead` as a `&[u8]` without reading the entire file into
1754/// memory.
1755///
1756/// This is defined as a macro to work around the lack of Higher Ranked Types. See the
1757/// example for how to pass a parser to the macro (constructing parts of the parser outside of
1758/// the `decode!` call is unlikely to work.
1759///
1760/// ```
1761/// # use tokio_dep as tokio;
1762/// # use futures_03_dep as futures;
1763/// use futures::pin_mut;
1764/// use tokio::{
1765///     fs::File,
1766/// };
1767///
1768/// use combine::{decode_tokio, satisfy, skip_many1, many1, sep_end_by, Parser, stream::{Decoder, buf_reader::BufReader}};
1769///
1770/// #[tokio::main]
1771/// async fn main() {
1772///     let mut read = BufReader::new(File::open("README.md").await.unwrap());
1773///     let mut decoder = Decoder::new_bufferless();
1774///     let is_whitespace = |b: u8| b == b' ' || b == b'\r' || b == b'\n';
1775///     assert_eq!(
1776///         decode_tokio!(
1777///             decoder,
1778///             read,
1779///             {
1780///                 let word = many1(satisfy(|b| !is_whitespace(b)));
1781///                 sep_end_by(word, skip_many1(satisfy(is_whitespace))).map(|words: Vec<Vec<u8>>| words.len())
1782///             },
1783///             |input, _position| combine::easy::Stream::from(input),
1784///         ).map_err(combine::easy::Errors::<u8, &[u8], _>::from),
1785///         Ok(773),
1786///     );
1787/// }
1788/// ```
1789#[cfg(feature = "tokio")]
1790#[cfg_attr(docsrs, doc(cfg(feature = "tokio")))]
1791#[macro_export]
1792macro_rules! decode_tokio {
1793    ($decoder: expr, $read: expr, $parser: expr $(,)?) => {
1794        $crate::decode_tokio!($decoder, $read, $parser, |input, _position| input)
1795    };
1796
1797    ($decoder: expr, $read: expr, $parser: expr, $input_stream: expr $(,)?) => {
1798        $crate::decode_tokio!($decoder, $read, $parser, $input_stream, |x| x)
1799    };
1800
1801    ($decoder: expr, $read: expr, $parser: expr, $input_stream: expr, $post_decode: expr $(,)?) => {
1802        match $decoder {
1803            ref mut decoder => match $read {
1804                ref mut read => 'outer: loop {
1805                    let (opt, removed) = {
1806                        let (state, position, buffer, end_of_input) = decoder.__inner();
1807                        let buffer =
1808                            $crate::stream::buf_reader::CombineBuffer::buffer(buffer, &*read);
1809                        let mut stream = $crate::stream::call_with2(
1810                            $crate::stream::MaybePartialStream(buffer, !end_of_input),
1811                            *position,
1812                            $input_stream,
1813                        );
1814                        let result = $crate::stream::decode($parser, &mut stream, state);
1815                        *position = $crate::stream::Positioned::position(&stream);
1816                        $crate::stream::call_with(stream, $post_decode);
1817                        match result {
1818                            Ok(x) => x,
1819                            Err(err) => {
1820                                break 'outer Err($crate::stream::decoder::Error::Parse(err))
1821                            }
1822                        }
1823                    };
1824
1825                    decoder.advance_pin(std::pin::Pin::new(read), removed);
1826
1827                    if let Some(v) = opt {
1828                        break 'outer Ok(v);
1829                    }
1830
1831                    match decoder
1832                        .__before_parse_tokio(std::pin::Pin::new(&mut *read))
1833                        .await
1834                    {
1835                        Ok(x) => x,
1836                        Err(error) => {
1837                            break 'outer Err($crate::stream::decoder::Error::Io {
1838                                error,
1839                                position: Clone::clone(decoder.position()),
1840                            })
1841                        }
1842                    };
1843                },
1844            },
1845        }
1846    };
1847}
1848
1849#[doc(hidden)]
1850pub fn call_with2<F, A, B, R>(a: A, b: B, f: F) -> R
1851where
1852    F: FnOnce(A, B) -> R,
1853{
1854    f(a, b)
1855}
1856
1857#[doc(hidden)]
1858pub fn call_with<F, A, R>(a: A, f: F) -> R
1859where
1860    F: FnOnce(A) -> R,
1861{
1862    f(a)
1863}
1864
1865#[cfg(test)]
1866mod tests {
1867
1868    use super::*;
1869
1870    #[test]
1871    #[inline]
1872    fn uncons_range_at_end() {
1873        assert_eq!("".uncons_range(0), Ok(""));
1874        assert_eq!("123".uncons_range(3), Ok("123"));
1875        assert_eq!((&[1][..]).uncons_range(1), Ok(&[1][..]));
1876        let s: &[u8] = &[];
1877        assert_eq!(SliceStream(s).uncons_range(0), Ok(&[][..]));
1878    }
1879
1880    #[test]
1881    fn larger_than_1_byte_items_return_correct_distance() {
1882        let mut input = &[123i32, 0i32][..];
1883
1884        let before = input.checkpoint();
1885        assert_eq!(input.distance(&before), 0);
1886
1887        input.uncons().unwrap();
1888        assert_eq!(input.distance(&before), 1);
1889
1890        input.uncons().unwrap();
1891        assert_eq!(input.distance(&before), 2);
1892
1893        input.reset(before).unwrap();
1894        assert_eq!(input.distance(&before), 0);
1895    }
1896}