combine/stream/
buf_reader.rs

1use std::io::{self, BufRead, Read};
2
3#[cfg(any(
4    feature = "futures-03",
5    feature = "tokio-02",
6    feature = "tokio-03",
7    feature = "tokio"
8))]
9use std::pin::Pin;
10
11#[cfg(any(feature = "futures-03", feature = "tokio-02", feature = "tokio-03"))]
12use std::mem::MaybeUninit;
13
14#[cfg(feature = "futures-core-03")]
15use std::task::{Context, Poll};
16
17#[cfg(feature = "futures-03")]
18use std::future::Future;
19
20use bytes::{Buf, BufMut, BytesMut};
21
22#[cfg(feature = "pin-project-lite")]
23use pin_project_lite::pin_project;
24
25#[cfg(feature = "tokio-03")]
26use tokio_03_dep::io::AsyncBufRead as _;
27
28#[cfg(feature = "tokio")]
29use tokio_dep::io::AsyncBufRead as _;
30
31#[cfg(feature = "futures-core-03")]
32use futures_core_03::ready;
33
34#[cfg(feature = "pin-project-lite")]
35pin_project! {
36    /// `BufReader` used by `Decoder` when it is constructed with [`Decoder::new_bufferless`][]
37    ///
38    /// [`Decoder::new_bufferless`]: ../decoder/struct.Decoder.html#method.new_bufferless
39    #[derive(Debug)]
40    pub struct BufReader<R> {
41        #[pin]
42        inner: R,
43        buf: BytesMut
44    }
45}
46
47#[cfg(not(feature = "pin-project-lite"))]
48/// `BufReader` used by `Decoder` when it is constructed with [`Decoder::new_bufferless`][]
49///
50/// [`Decoder::new_bufferless`]: ../decoder/struct.Decoder.html#method.new_bufferless
51#[derive(Debug)]
52pub struct BufReader<R> {
53    inner: R,
54    buf: BytesMut,
55}
56
57impl<R> BufReader<R> {
58    /// Creates a new `BufReader` with a default buffer capacity. The default is currently 8 KB,
59    /// but may change in the future.
60    pub fn new(inner: R) -> Self {
61        Self::with_capacity(8096, inner)
62    }
63
64    /// Creates a new `BufReader` with the specified buffer capacity.
65    pub fn with_capacity(capacity: usize, inner: R) -> Self {
66        let buf = BytesMut::with_capacity(capacity);
67
68        Self { inner, buf }
69    }
70
71    /// Gets a reference to the underlying reader.
72    ///
73    /// It is inadvisable to directly read from the underlying reader.
74    pub fn get_ref(&self) -> &R {
75        &self.inner
76    }
77
78    /// Gets a mutable reference to the underlying reader.
79    ///
80    /// It is inadvisable to directly read from the underlying reader.
81    pub fn get_mut(&mut self) -> &mut R {
82        &mut self.inner
83    }
84
85    #[cfg(feature = "pin-project-lite")]
86    /// Gets a pinned mutable reference to the underlying reader.
87    ///
88    /// It is inadvisable to directly read from the underlying reader.
89    pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut R> {
90        self.project().inner
91    }
92
93    /// Consumes this `BufWriter`, returning the underlying reader.
94    ///
95    /// Note that any leftover data in the internal buffer is lost.
96    pub fn into_inner(self) -> R {
97        self.inner
98    }
99
100    /// Returns a reference to the internally buffered data.
101    ///
102    /// Unlike `fill_buf`, this will not attempt to fill the buffer if it is empty.
103    pub fn buffer(&self) -> &[u8] {
104        &self.buf
105    }
106
107    /// Invalidates all data in the internal buffer.
108    #[inline]
109    #[cfg(any(feature = "tokio-02", feature = "tokio-03", feature = "tokio"))]
110    fn discard_buffer(self: Pin<&mut Self>) {
111        let me = self.project();
112        me.buf.clear();
113    }
114}
115
116mod sealed {
117    pub trait Sealed {}
118}
119
120#[doc(hidden)]
121pub trait CombineBuffer<R>: sealed::Sealed {
122    fn buffer<'a>(&'a self, read: &'a R) -> &'a [u8];
123
124    fn advance(&mut self, read: &mut R, len: usize);
125
126    #[cfg(feature = "pin-project-lite")]
127    fn advance_pin(&mut self, read: Pin<&mut R>, len: usize);
128}
129
130#[doc(hidden)]
131pub trait CombineSyncRead<R>: CombineBuffer<R> {
132    fn extend_buf_sync(&mut self, read: &mut R) -> io::Result<usize>;
133}
134
135#[cfg(any(feature = "tokio-02", feature = "tokio-03", feature = "tokio"))]
136#[doc(hidden)]
137pub trait CombineRead<R, T: ?Sized>: CombineBuffer<R> {
138    fn poll_extend_buf(
139        &mut self,
140        cx: &mut Context<'_>,
141        read: Pin<&mut R>,
142    ) -> Poll<io::Result<usize>>;
143}
144
145#[cfg(feature = "futures-03")]
146#[doc(hidden)]
147pub trait CombineAsyncRead<R>: CombineBuffer<R> {
148    fn poll_extend_buf(
149        &mut self,
150        cx: &mut Context<'_>,
151        read: Pin<&mut R>,
152    ) -> Poll<io::Result<usize>>;
153
154    fn extend_buf<'a>(&'a mut self, read: Pin<&'a mut R>) -> ExtendBuf<'a, Self, R>
155    where
156        Self: Sized;
157}
158
159#[cfg(feature = "futures-03")]
160pin_project_lite::pin_project! {
161    #[doc(hidden)]
162    pub struct ExtendBuf<'a, C, R> {
163        buffer: &'a mut C,
164        read: Pin<&'a mut R>
165    }
166}
167
168#[cfg(feature = "futures-03")]
169impl<'a, C, R> Future for ExtendBuf<'a, C, R>
170where
171    C: CombineAsyncRead<R>,
172{
173    type Output = io::Result<usize>;
174
175    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
176        let me = self.project();
177        me.buffer.poll_extend_buf(cx, me.read.as_mut())
178    }
179}
180
181/// Marker used by `Decoder` for an internal buffer
182#[derive(Default)]
183pub struct Buffer(pub(crate) BytesMut);
184
185impl sealed::Sealed for Buffer {}
186
187impl<R> CombineBuffer<R> for Buffer {
188    fn buffer<'a>(&'a self, _read: &'a R) -> &'a [u8] {
189        &self.0
190    }
191
192    fn advance(&mut self, _read: &mut R, len: usize) {
193        self.0.advance(len);
194    }
195
196    #[cfg(feature = "pin-project-lite")]
197    fn advance_pin(&mut self, _read: Pin<&mut R>, len: usize) {
198        self.0.advance(len);
199    }
200}
201
202impl<R> CombineSyncRead<R> for Buffer
203where
204    R: Read,
205{
206    fn extend_buf_sync(&mut self, read: &mut R) -> io::Result<usize> {
207        extend_buf_sync(&mut self.0, read)
208    }
209}
210
211#[cfg(feature = "futures-03")]
212impl<R> CombineAsyncRead<R> for Buffer
213where
214    R: futures_io_03::AsyncRead,
215{
216    fn poll_extend_buf(
217        &mut self,
218        cx: &mut Context<'_>,
219        read: Pin<&mut R>,
220    ) -> Poll<io::Result<usize>> {
221        poll_extend_buf(&mut self.0, cx, read)
222    }
223
224    fn extend_buf<'a>(&'a mut self, read: Pin<&'a mut R>) -> ExtendBuf<'a, Self, R> {
225        if !self.0.has_remaining_mut() {
226            self.0.reserve(8 * 1024);
227        }
228        // Copy of tokio's read_buf method (but it has to force initialize the buffer)
229        let bs = self.0.chunk_mut();
230
231        for i in 0..bs.len() {
232            bs.write_byte(i, 0);
233        }
234        ExtendBuf { buffer: self, read }
235    }
236}
237
238#[cfg(feature = "tokio-02")]
239impl<R> CombineRead<R, dyn tokio_02_dep::io::AsyncRead> for Buffer
240where
241    R: tokio_02_dep::io::AsyncRead,
242{
243    fn poll_extend_buf(
244        &mut self,
245        cx: &mut Context<'_>,
246        read: Pin<&mut R>,
247    ) -> Poll<io::Result<usize>> {
248        if !self.0.has_remaining_mut() {
249            self.0.reserve(8 * 1024);
250        }
251        read.poll_read_buf(cx, &mut Bytes05(&mut self.0))
252    }
253}
254
255#[cfg(feature = "tokio-03")]
256fn tokio_03_to_read_buf(bs: &mut BytesMut) -> tokio_03_dep::io::ReadBuf<'_> {
257    let uninit = bs.chunk_mut();
258    unsafe {
259        tokio_03_dep::io::ReadBuf::uninit(std::slice::from_raw_parts_mut(
260            uninit.as_mut_ptr() as *mut MaybeUninit<u8>,
261            uninit.len(),
262        ))
263    }
264}
265
266#[cfg(feature = "tokio-03")]
267impl<R> CombineRead<R, dyn tokio_03_dep::io::AsyncRead> for Buffer
268where
269    R: tokio_03_dep::io::AsyncRead,
270{
271    fn poll_extend_buf(
272        &mut self,
273        cx: &mut Context<'_>,
274        read: Pin<&mut R>,
275    ) -> Poll<io::Result<usize>> {
276        tokio_03_read_buf(cx, read, &mut self.0)
277    }
278}
279
280#[cfg(feature = "tokio-03")]
281fn tokio_03_read_buf(
282    cx: &mut Context<'_>,
283    read: Pin<&mut impl tokio_03_dep::io::AsyncRead>,
284    bs: &mut bytes::BytesMut,
285) -> Poll<io::Result<usize>> {
286    if !bs.has_remaining_mut() {
287        bs.reserve(8 * 1024);
288    }
289
290    let mut buf = tokio_03_to_read_buf(bs);
291    ready!(read.poll_read(cx, &mut buf))?;
292    unsafe {
293        let n = buf.filled().len();
294        bs.advance_mut(n);
295        Poll::Ready(Ok(n))
296    }
297}
298
299#[cfg(feature = "tokio")]
300impl<R> CombineRead<R, dyn tokio_dep::io::AsyncRead> for Buffer
301where
302    R: tokio_dep::io::AsyncRead,
303{
304    fn poll_extend_buf(
305        &mut self,
306        cx: &mut Context<'_>,
307        read: Pin<&mut R>,
308    ) -> Poll<io::Result<usize>> {
309        tokio_read_buf(read, cx, &mut self.0)
310    }
311}
312
313#[cfg(feature = "tokio")]
314fn tokio_read_buf(
315    read: Pin<&mut impl tokio_dep::io::AsyncRead>,
316    cx: &mut Context<'_>,
317    bs: &mut bytes::BytesMut,
318) -> Poll<io::Result<usize>> {
319    if !bs.has_remaining_mut() {
320        bs.reserve(8 * 1024);
321    }
322
323    tokio_util::io::poll_read_buf(read, cx, bs)
324}
325
326/// Marker used by `Decoder` for an external buffer
327#[derive(Default)]
328pub struct Bufferless;
329
330impl sealed::Sealed for Bufferless {}
331
332impl<R> CombineBuffer<BufReader<R>> for Bufferless {
333    fn buffer<'a>(&'a self, read: &'a BufReader<R>) -> &'a [u8] {
334        &read.buf
335    }
336
337    fn advance(&mut self, read: &mut BufReader<R>, len: usize) {
338        read.buf.advance(len);
339    }
340
341    #[cfg(feature = "pin-project-lite")]
342    fn advance_pin(&mut self, read: Pin<&mut BufReader<R>>, len: usize) {
343        read.project().buf.advance(len);
344    }
345}
346
347impl<R> CombineSyncRead<BufReader<R>> for Bufferless
348where
349    R: Read,
350{
351    fn extend_buf_sync(&mut self, read: &mut BufReader<R>) -> io::Result<usize> {
352        extend_buf_sync(&mut read.buf, &mut read.inner)
353    }
354}
355
356fn extend_buf_sync<R>(buf: &mut BytesMut, read: &mut R) -> io::Result<usize>
357where
358    R: Read,
359{
360    let size = 8 * 1024;
361    if !buf.has_remaining_mut() {
362        buf.reserve(size);
363    }
364
365    // Copy of tokio's poll_read_buf method (but it has to force initialize the buffer)
366    let n = {
367        let bs = buf.chunk_mut();
368
369        let initial_size = bs.len().min(size);
370        let bs = &mut bs[..initial_size];
371        for i in 0..bs.len() {
372            bs.write_byte(i, 0);
373        }
374
375        // Convert to `&mut [u8]`
376        // SAFETY: the entire buffer is preinitialized above
377        let bs = unsafe { &mut *(bs as *mut _ as *mut [u8]) };
378
379        let n = read.read(bs)?;
380        assert!(
381            n <= bs.len(),
382            "AsyncRead reported that it initialized more than the number of bytes in the buffer"
383        );
384        n
385    };
386
387    // SAFETY: the entire buffer has been preinitialized
388    unsafe { buf.advance_mut(n) };
389
390    Ok(n)
391}
392
393#[cfg(feature = "tokio-02")]
394struct Bytes05<'a>(&'a mut BytesMut);
395
396#[cfg(feature = "tokio-02")]
397impl bytes_05::BufMut for Bytes05<'_> {
398    fn remaining_mut(&self) -> usize {
399        self.0.remaining_mut()
400    }
401    unsafe fn advance_mut(&mut self, cnt: usize) {
402        self.0.advance_mut(cnt)
403    }
404    fn bytes_mut(&mut self) -> &mut [MaybeUninit<u8>] {
405        unsafe { &mut *(self.0.chunk_mut() as *mut _ as *mut [MaybeUninit<u8>]) }
406    }
407}
408
409#[cfg(feature = "tokio-02")]
410impl<R> CombineRead<BufReader<R>, dyn tokio_02_dep::io::AsyncRead> for Bufferless
411where
412    R: tokio_02_dep::io::AsyncRead,
413{
414    fn poll_extend_buf(
415        &mut self,
416        cx: &mut Context<'_>,
417        read: Pin<&mut BufReader<R>>,
418    ) -> Poll<io::Result<usize>> {
419        let me = read.project();
420
421        if !me.buf.has_remaining_mut() {
422            me.buf.reserve(8 * 1024);
423        }
424        tokio_02_dep::io::AsyncRead::poll_read_buf(me.inner, cx, &mut Bytes05(me.buf))
425    }
426}
427
428#[cfg(feature = "tokio-03")]
429impl<R> CombineRead<BufReader<R>, dyn tokio_03_dep::io::AsyncRead> for Bufferless
430where
431    R: tokio_03_dep::io::AsyncRead,
432{
433    fn poll_extend_buf(
434        &mut self,
435        cx: &mut Context<'_>,
436        read: Pin<&mut BufReader<R>>,
437    ) -> Poll<io::Result<usize>> {
438        let me = read.project();
439
440        tokio_03_read_buf(cx, me.inner, me.buf)
441    }
442}
443
444#[cfg(feature = "tokio")]
445impl<R> CombineRead<BufReader<R>, dyn tokio_dep::io::AsyncRead> for Bufferless
446where
447    R: tokio_dep::io::AsyncRead,
448{
449    fn poll_extend_buf(
450        &mut self,
451        cx: &mut Context<'_>,
452        read: Pin<&mut BufReader<R>>,
453    ) -> Poll<io::Result<usize>> {
454        let me = read.project();
455
456        tokio_read_buf(me.inner, cx, me.buf)
457    }
458}
459
460#[cfg(feature = "futures-03")]
461impl<R> CombineAsyncRead<BufReader<R>> for Bufferless
462where
463    R: futures_io_03::AsyncRead,
464{
465    fn poll_extend_buf(
466        &mut self,
467        cx: &mut Context<'_>,
468        read: Pin<&mut BufReader<R>>,
469    ) -> Poll<io::Result<usize>> {
470        let me = read.project();
471
472        poll_extend_buf(me.buf, cx, me.inner)
473    }
474
475    fn extend_buf<'a>(
476        &'a mut self,
477        mut read: Pin<&'a mut BufReader<R>>,
478    ) -> ExtendBuf<'a, Self, BufReader<R>> {
479        let me = read.as_mut().project();
480
481        if !me.buf.has_remaining_mut() {
482            me.buf.reserve(8 * 1024);
483        }
484        // Copy of tokio's read_buf method (but it has to force initialize the buffer)
485        let bs = me.buf.chunk_mut();
486
487        for i in 0..bs.len() {
488            bs.write_byte(i, 0);
489        }
490        ExtendBuf { buffer: self, read }
491    }
492}
493
494#[cfg(feature = "futures-03")]
495fn poll_extend_buf<R>(
496    buf: &mut BytesMut,
497    cx: &mut Context<'_>,
498    read: Pin<&mut R>,
499) -> Poll<io::Result<usize>>
500where
501    R: futures_io_03::AsyncRead,
502{
503    // Copy of tokio's read_buf method (but it has to force initialize the buffer)
504    let n = {
505        let bs = buf.chunk_mut();
506        // preinit the buffer
507        for i in 0..bs.len() {
508            bs.write_byte(i, 0);
509        }
510
511        // Convert to `&mut [u8]`
512        // SAFETY: preinitialize the buffer
513        let bs = unsafe { &mut *(bs as *mut _ as *mut [u8]) };
514
515        let n = ready!(read.poll_read(cx, bs))?;
516        assert!(
517            n <= bs.len(),
518            "AsyncRead reported that it initialized more than the number of bytes in the buffer"
519        );
520        n
521    };
522    // SAFETY: the buffer was preinitialized
523    unsafe { buf.advance_mut(n) };
524    Poll::Ready(Ok(n))
525}
526
527#[cfg(feature = "tokio-02")]
528impl<R: tokio_02_dep::io::AsyncRead> tokio_02_dep::io::AsyncRead for BufReader<R> {
529    fn poll_read(
530        mut self: Pin<&mut Self>,
531        cx: &mut Context<'_>,
532        buf: &mut [u8],
533    ) -> Poll<io::Result<usize>> {
534        use tokio_02_dep::io::AsyncBufRead;
535
536        // If we don't have any buffered data and we're doing a massive read
537        // (larger than our internal buffer), bypass our internal buffer
538        // entirely.
539        if !self.buf.has_remaining_mut() && buf.len() >= self.buf.len() {
540            let res = ready!(self.as_mut().get_pin_mut().poll_read(cx, buf));
541            self.discard_buffer();
542            return Poll::Ready(res);
543        }
544        let mut rem = ready!(self.as_mut().poll_fill_buf(cx))?;
545        let nread = rem.read(buf)?;
546        self.consume(nread);
547        Poll::Ready(Ok(nread))
548    }
549
550    // we can't skip unconditionally because of the large buffer case in read.
551    unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [MaybeUninit<u8>]) -> bool {
552        self.inner.prepare_uninitialized_buffer(buf)
553    }
554}
555
556#[cfg(feature = "tokio-02")]
557impl<R: tokio_02_dep::io::AsyncRead> tokio_02_dep::io::AsyncBufRead for BufReader<R> {
558    fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
559        let me = self.project();
560
561        // If we've reached the end of our internal buffer then we need to fetch
562        // some more data from the underlying reader.
563        // Branch using `>=` instead of the more correct `==`
564        // to tell the compiler that the pos..cap slice is always valid.
565
566        if me.buf.is_empty() {
567            ready!(me.inner.poll_read_buf(cx, &mut Bytes05(me.buf)))?;
568        }
569        Poll::Ready(Ok(&me.buf[..]))
570    }
571
572    fn consume(self: Pin<&mut Self>, amt: usize) {
573        let me = self.project();
574        me.buf.advance(amt);
575    }
576}
577
578#[cfg(feature = "tokio-02")]
579impl<R: tokio_02_dep::io::AsyncRead + tokio_02_dep::io::AsyncWrite> tokio_02_dep::io::AsyncWrite
580    for BufReader<R>
581{
582    fn poll_write(
583        self: Pin<&mut Self>,
584        cx: &mut Context<'_>,
585        buf: &[u8],
586    ) -> Poll<io::Result<usize>> {
587        self.get_pin_mut().poll_write(cx, buf)
588    }
589
590    fn poll_write_buf<B: bytes_05::Buf>(
591        self: Pin<&mut Self>,
592        cx: &mut Context<'_>,
593        buf: &mut B,
594    ) -> Poll<io::Result<usize>> {
595        self.get_pin_mut().poll_write_buf(cx, buf)
596    }
597
598    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
599        self.get_pin_mut().poll_flush(cx)
600    }
601
602    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
603        self.get_pin_mut().poll_shutdown(cx)
604    }
605}
606
607#[cfg(feature = "tokio-03")]
608impl<R: tokio_03_dep::io::AsyncRead> tokio_03_dep::io::AsyncRead for BufReader<R> {
609    fn poll_read(
610        mut self: Pin<&mut Self>,
611        cx: &mut Context<'_>,
612        buf: &mut tokio_03_dep::io::ReadBuf<'_>,
613    ) -> Poll<io::Result<()>> {
614        // If we don't have any buffered data and we're doing a massive read
615        // (larger than our internal buffer), bypass our internal buffer
616        // entirely.
617        if !self.buf.has_remaining_mut() && buf.remaining() >= self.buf.len() {
618            let res = ready!(self.as_mut().get_pin_mut().poll_read(cx, buf));
619            self.discard_buffer();
620            return Poll::Ready(res);
621        }
622        let rem = ready!(self.as_mut().poll_fill_buf(cx))?;
623        let amt = std::cmp::min(rem.len(), buf.remaining());
624        buf.put_slice(&rem[..amt]);
625        self.consume(amt);
626        Poll::Ready(Ok(()))
627    }
628}
629
630#[cfg(feature = "tokio-03")]
631impl<R: tokio_03_dep::io::AsyncRead> tokio_03_dep::io::AsyncBufRead for BufReader<R> {
632    fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
633        let me = self.project();
634
635        // If we've reached the end of our internal buffer then we need to fetch
636        // some more data from the underlying reader.
637        if me.buf.is_empty() {
638            ready!(tokio_03_read_buf(cx, me.inner, me.buf))?;
639        }
640        Poll::Ready(Ok(&me.buf[..]))
641    }
642
643    fn consume(self: Pin<&mut Self>, amt: usize) {
644        let me = self.project();
645        me.buf.advance(amt);
646    }
647}
648
649#[cfg(feature = "tokio-03")]
650impl<R: tokio_03_dep::io::AsyncRead + tokio_03_dep::io::AsyncWrite> tokio_03_dep::io::AsyncWrite
651    for BufReader<R>
652{
653    fn poll_write(
654        self: Pin<&mut Self>,
655        cx: &mut Context<'_>,
656        buf: &[u8],
657    ) -> Poll<io::Result<usize>> {
658        self.get_pin_mut().poll_write(cx, buf)
659    }
660
661    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
662        self.get_pin_mut().poll_flush(cx)
663    }
664
665    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
666        self.get_pin_mut().poll_shutdown(cx)
667    }
668}
669
670#[cfg(feature = "tokio")]
671impl<R: tokio_dep::io::AsyncRead> tokio_dep::io::AsyncRead for BufReader<R> {
672    fn poll_read(
673        mut self: Pin<&mut Self>,
674        cx: &mut Context<'_>,
675        buf: &mut tokio_dep::io::ReadBuf<'_>,
676    ) -> Poll<io::Result<()>> {
677        // If we don't have any buffered data and we're doing a massive read
678        // (larger than our internal buffer), bypass our internal buffer
679        // entirely.
680        if !self.buf.has_remaining_mut() && buf.remaining() >= self.buf.len() {
681            let res = ready!(self.as_mut().get_pin_mut().poll_read(cx, buf));
682            self.discard_buffer();
683            return Poll::Ready(res);
684        }
685        let rem = ready!(self.as_mut().poll_fill_buf(cx))?;
686        let amt = std::cmp::min(rem.len(), buf.remaining());
687        buf.put_slice(&rem[..amt]);
688        self.consume(amt);
689        Poll::Ready(Ok(()))
690    }
691}
692
693#[cfg(feature = "tokio")]
694impl<R: tokio_dep::io::AsyncRead> tokio_dep::io::AsyncBufRead for BufReader<R> {
695    fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
696        let me = self.project();
697
698        // If we've reached the end of our internal buffer then we need to fetch
699        // some more data from the underlying reader.
700        if me.buf.is_empty() {
701            ready!(tokio_read_buf(me.inner, cx, me.buf))?;
702        }
703        Poll::Ready(Ok(&me.buf[..]))
704    }
705
706    fn consume(self: Pin<&mut Self>, amt: usize) {
707        let me = self.project();
708        me.buf.advance(amt);
709    }
710}
711
712#[cfg(feature = "tokio")]
713impl<R: tokio_dep::io::AsyncRead + tokio_dep::io::AsyncWrite> tokio_dep::io::AsyncWrite
714    for BufReader<R>
715{
716    fn poll_write(
717        self: Pin<&mut Self>,
718        cx: &mut Context<'_>,
719        buf: &[u8],
720    ) -> Poll<io::Result<usize>> {
721        self.get_pin_mut().poll_write(cx, buf)
722    }
723
724    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
725        self.get_pin_mut().poll_flush(cx)
726    }
727
728    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
729        self.get_pin_mut().poll_shutdown(cx)
730    }
731}
732
733impl<R: Read> Read for BufReader<R> {
734    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
735        // If we don't have any buffered data and we're doing a massive read
736        // (larger than our internal buffer), bypass our internal buffer
737        // entirely.
738        if !self.buf.has_remaining_mut() && buf.len() >= self.buf.len() {
739            let res = self.read(buf);
740            self.buf.clear();
741            return res;
742        }
743        let nread = {
744            let mut rem = self.fill_buf()?;
745            rem.read(buf)?
746        };
747        self.consume(nread);
748        Ok(nread)
749    }
750}
751
752impl<R: Read> BufRead for BufReader<R> {
753    fn fill_buf(&mut self) -> io::Result<&[u8]> {
754        // If we've reached the end of our internal buffer then we need to fetch
755        // some more data from the underlying reader.
756        // Branch using `>=` instead of the more correct `==`
757        // to tell the compiler that the pos..cap slice is always valid.
758
759        if self.buf.is_empty() {
760            Bufferless.extend_buf_sync(self)?;
761        }
762        Ok(&self.buf[..])
763    }
764
765    fn consume(&mut self, amt: usize) {
766        self.buf.advance(amt);
767    }
768}
769
770#[cfg(test)]
771#[cfg(feature = "tokio-02")]
772mod tests {
773    use super::{BufReader, Bufferless, CombineRead};
774
775    use std::{io, pin::Pin};
776
777    use {
778        bytes_05::BytesMut,
779        tokio_02_dep::{
780            self as tokio,
781            io::{AsyncRead, AsyncReadExt},
782        },
783    };
784
785    impl<R: AsyncRead> BufReader<R> {
786        async fn extend_buf_tokio_02(mut self: Pin<&mut Self>) -> io::Result<usize> {
787            crate::future_ext::poll_fn(|cx| Bufferless.poll_extend_buf(cx, self.as_mut())).await
788        }
789    }
790
791    #[tokio::test]
792    async fn buf_reader() {
793        let mut read = BufReader::with_capacity(3, &[1u8, 2, 3, 4, 5, 6, 7, 8, 9, 0][..]);
794
795        let mut buf = [0u8; 3];
796        read.read(&mut buf).await.unwrap();
797        assert_eq!(buf, [1, 2, 3]);
798
799        let mut buf = [0u8; 3];
800        read.read(&mut buf).await.unwrap();
801        assert_eq!(buf, [4, 5, 6]);
802
803        let mut buf = [0u8; 3];
804        read.read(&mut buf).await.unwrap();
805        assert_eq!(buf, [7, 8, 9]);
806
807        let mut buf = [1u8; 3];
808        read.read(&mut buf).await.unwrap();
809        assert_eq!(buf, [0, 1, 1]);
810    }
811
812    #[tokio::test]
813    async fn buf_reader_buf() {
814        let mut read = BufReader::with_capacity(3, &[1u8, 2, 3, 4, 5, 6, 7, 8, 9, 0][..]);
815
816        let mut buf = BytesMut::with_capacity(3);
817        read.read_buf(&mut buf).await.unwrap();
818        assert_eq!(&buf[..], [1, 2, 3]);
819
820        read.read_buf(&mut buf).await.unwrap();
821        assert_eq!(&buf[..], [1, 2, 3, 4, 5, 6, 7, 8, 9, 0]);
822    }
823
824    #[tokio::test]
825    async fn buf_reader_extend_buf() {
826        let read = BufReader::with_capacity(3, &[1u8, 2, 3, 4, 5, 6, 7, 8, 9, 0][..]);
827        futures_03_dep::pin_mut!(read);
828
829        assert_eq!(read.as_mut().extend_buf_tokio_02().await.unwrap(), 3);
830        assert_eq!(read.buffer(), [1, 2, 3]);
831
832        assert_eq!(read.as_mut().extend_buf_tokio_02().await.unwrap(), 7);
833        assert_eq!(read.buffer(), [1, 2, 3, 4, 5, 6, 7, 8, 9, 0]);
834    }
835}
836
837#[cfg(test)]
838#[cfg(feature = "tokio")]
839mod tests_tokio_1 {
840    use super::{BufReader, Bufferless, CombineRead};
841
842    use std::{io, pin::Pin};
843
844    use {
845        bytes::BytesMut,
846        tokio_dep::{
847            self as tokio,
848            io::{AsyncRead, AsyncReadExt},
849        },
850    };
851
852    impl<R: AsyncRead> BufReader<R> {
853        async fn extend_buf_tokio(mut self: Pin<&mut Self>) -> io::Result<usize> {
854            crate::future_ext::poll_fn(|cx| Bufferless.poll_extend_buf(cx, self.as_mut())).await
855        }
856    }
857
858    #[tokio::test]
859    async fn buf_reader() {
860        let mut read = BufReader::with_capacity(3, &[1u8, 2, 3, 4, 5, 6, 7, 8, 9, 0][..]);
861
862        let mut buf = [0u8; 3];
863        read.read(&mut buf).await.unwrap();
864        assert_eq!(buf, [1, 2, 3]);
865
866        let mut buf = [0u8; 3];
867        read.read(&mut buf).await.unwrap();
868        assert_eq!(buf, [4, 5, 6]);
869
870        let mut buf = [0u8; 3];
871        read.read(&mut buf).await.unwrap();
872        assert_eq!(buf, [7, 8, 9]);
873
874        let mut buf = [1u8; 3];
875        read.read(&mut buf).await.unwrap();
876        assert_eq!(buf, [0, 1, 1]);
877    }
878
879    #[tokio::test]
880    async fn buf_reader_buf() {
881        let mut read = BufReader::with_capacity(3, &[1u8, 2, 3, 4, 5, 6, 7, 8, 9, 0][..]);
882
883        let mut buf = BytesMut::with_capacity(3);
884        read.read_buf(&mut buf).await.unwrap();
885        assert_eq!(&buf[..], [1, 2, 3]);
886
887        read.read_buf(&mut buf).await.unwrap();
888        assert_eq!(&buf[..], [1, 2, 3, 4, 5, 6, 7, 8, 9, 0]);
889    }
890
891    #[tokio::test]
892    async fn buf_reader_extend_buf() {
893        let read = BufReader::with_capacity(3, &[1u8, 2, 3, 4, 5, 6, 7, 8, 9, 0][..]);
894        futures_03_dep::pin_mut!(read);
895
896        assert_eq!(read.as_mut().extend_buf_tokio().await.unwrap(), 3);
897        assert_eq!(read.buffer(), [1, 2, 3]);
898
899        assert_eq!(read.as_mut().extend_buf_tokio().await.unwrap(), 7);
900        assert_eq!(read.buffer(), [1, 2, 3, 4, 5, 6, 7, 8, 9, 0]);
901    }
902}
903
904#[cfg(test)]
905mod tests_sync {
906    use super::{BufReader, Bufferless, CombineSyncRead};
907
908    use std::io::Read;
909
910    #[test]
911    #[allow(clippy::unused_io_amount)]
912    fn buf_reader() {
913        let mut read = BufReader::with_capacity(3, &[1u8, 2, 3, 4, 5, 6, 7, 8, 9, 0][..]);
914
915        let mut buf = [0u8; 3];
916        read.read(&mut buf).unwrap();
917        assert_eq!(buf, [1, 2, 3]);
918
919        let mut buf = [0u8; 3];
920        read.read(&mut buf).unwrap();
921        assert_eq!(buf, [4, 5, 6]);
922
923        let mut buf = [0u8; 3];
924        read.read(&mut buf).unwrap();
925        assert_eq!(buf, [7, 8, 9]);
926
927        let mut buf = [1u8; 3];
928        read.read(&mut buf).unwrap();
929        assert_eq!(buf, [0, 1, 1]);
930    }
931
932    #[test]
933    fn buf_reader_extend_buf() {
934        let mut read = BufReader::with_capacity(3, &[1u8, 2, 3, 4, 5, 6, 7, 8, 9, 0][..]);
935
936        assert_eq!(Bufferless.extend_buf_sync(&mut read).unwrap(), 3);
937        assert_eq!(read.buffer(), [1, 2, 3]);
938
939        assert_eq!(Bufferless.extend_buf_sync(&mut read).unwrap(), 7);
940        assert_eq!(read.buffer(), [1, 2, 3, 4, 5, 6, 7, 8, 9, 0]);
941    }
942}