1use std::io::{self, BufRead, Read};
2
3#[cfg(any(
4 feature = "futures-03",
5 feature = "tokio-02",
6 feature = "tokio-03",
7 feature = "tokio"
8))]
9use std::pin::Pin;
10
11#[cfg(any(feature = "futures-03", feature = "tokio-02", feature = "tokio-03"))]
12use std::mem::MaybeUninit;
13
14#[cfg(feature = "futures-core-03")]
15use std::task::{Context, Poll};
16
17#[cfg(feature = "futures-03")]
18use std::future::Future;
19
20use bytes::{Buf, BufMut, BytesMut};
21
22#[cfg(feature = "pin-project-lite")]
23use pin_project_lite::pin_project;
24
25#[cfg(feature = "tokio-03")]
26use tokio_03_dep::io::AsyncBufRead as _;
27
28#[cfg(feature = "tokio")]
29use tokio_dep::io::AsyncBufRead as _;
30
31#[cfg(feature = "futures-core-03")]
32use futures_core_03::ready;
33
34#[cfg(feature = "pin-project-lite")]
35pin_project! {
36 #[derive(Debug)]
40 pub struct BufReader<R> {
41 #[pin]
42 inner: R,
43 buf: BytesMut
44 }
45}
46
47#[cfg(not(feature = "pin-project-lite"))]
48#[derive(Debug)]
52pub struct BufReader<R> {
53 inner: R,
54 buf: BytesMut,
55}
56
57impl<R> BufReader<R> {
58 pub fn new(inner: R) -> Self {
61 Self::with_capacity(8096, inner)
62 }
63
64 pub fn with_capacity(capacity: usize, inner: R) -> Self {
66 let buf = BytesMut::with_capacity(capacity);
67
68 Self { inner, buf }
69 }
70
71 pub fn get_ref(&self) -> &R {
75 &self.inner
76 }
77
78 pub fn get_mut(&mut self) -> &mut R {
82 &mut self.inner
83 }
84
85 #[cfg(feature = "pin-project-lite")]
86 pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut R> {
90 self.project().inner
91 }
92
93 pub fn into_inner(self) -> R {
97 self.inner
98 }
99
100 pub fn buffer(&self) -> &[u8] {
104 &self.buf
105 }
106
107 #[inline]
109 #[cfg(any(feature = "tokio-02", feature = "tokio-03", feature = "tokio"))]
110 fn discard_buffer(self: Pin<&mut Self>) {
111 let me = self.project();
112 me.buf.clear();
113 }
114}
115
116mod sealed {
117 pub trait Sealed {}
118}
119
120#[doc(hidden)]
121pub trait CombineBuffer<R>: sealed::Sealed {
122 fn buffer<'a>(&'a self, read: &'a R) -> &'a [u8];
123
124 fn advance(&mut self, read: &mut R, len: usize);
125
126 #[cfg(feature = "pin-project-lite")]
127 fn advance_pin(&mut self, read: Pin<&mut R>, len: usize);
128}
129
130#[doc(hidden)]
131pub trait CombineSyncRead<R>: CombineBuffer<R> {
132 fn extend_buf_sync(&mut self, read: &mut R) -> io::Result<usize>;
133}
134
135#[cfg(any(feature = "tokio-02", feature = "tokio-03", feature = "tokio"))]
136#[doc(hidden)]
137pub trait CombineRead<R, T: ?Sized>: CombineBuffer<R> {
138 fn poll_extend_buf(
139 &mut self,
140 cx: &mut Context<'_>,
141 read: Pin<&mut R>,
142 ) -> Poll<io::Result<usize>>;
143}
144
145#[cfg(feature = "futures-03")]
146#[doc(hidden)]
147pub trait CombineAsyncRead<R>: CombineBuffer<R> {
148 fn poll_extend_buf(
149 &mut self,
150 cx: &mut Context<'_>,
151 read: Pin<&mut R>,
152 ) -> Poll<io::Result<usize>>;
153
154 fn extend_buf<'a>(&'a mut self, read: Pin<&'a mut R>) -> ExtendBuf<'a, Self, R>
155 where
156 Self: Sized;
157}
158
159#[cfg(feature = "futures-03")]
160pin_project_lite::pin_project! {
161 #[doc(hidden)]
162 pub struct ExtendBuf<'a, C, R> {
163 buffer: &'a mut C,
164 read: Pin<&'a mut R>
165 }
166}
167
168#[cfg(feature = "futures-03")]
169impl<'a, C, R> Future for ExtendBuf<'a, C, R>
170where
171 C: CombineAsyncRead<R>,
172{
173 type Output = io::Result<usize>;
174
175 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
176 let me = self.project();
177 me.buffer.poll_extend_buf(cx, me.read.as_mut())
178 }
179}
180
181#[derive(Default)]
183pub struct Buffer(pub(crate) BytesMut);
184
185impl sealed::Sealed for Buffer {}
186
187impl<R> CombineBuffer<R> for Buffer {
188 fn buffer<'a>(&'a self, _read: &'a R) -> &'a [u8] {
189 &self.0
190 }
191
192 fn advance(&mut self, _read: &mut R, len: usize) {
193 self.0.advance(len);
194 }
195
196 #[cfg(feature = "pin-project-lite")]
197 fn advance_pin(&mut self, _read: Pin<&mut R>, len: usize) {
198 self.0.advance(len);
199 }
200}
201
202impl<R> CombineSyncRead<R> for Buffer
203where
204 R: Read,
205{
206 fn extend_buf_sync(&mut self, read: &mut R) -> io::Result<usize> {
207 extend_buf_sync(&mut self.0, read)
208 }
209}
210
211#[cfg(feature = "futures-03")]
212impl<R> CombineAsyncRead<R> for Buffer
213where
214 R: futures_io_03::AsyncRead,
215{
216 fn poll_extend_buf(
217 &mut self,
218 cx: &mut Context<'_>,
219 read: Pin<&mut R>,
220 ) -> Poll<io::Result<usize>> {
221 poll_extend_buf(&mut self.0, cx, read)
222 }
223
224 fn extend_buf<'a>(&'a mut self, read: Pin<&'a mut R>) -> ExtendBuf<'a, Self, R> {
225 if !self.0.has_remaining_mut() {
226 self.0.reserve(8 * 1024);
227 }
228 let bs = self.0.chunk_mut();
230
231 for i in 0..bs.len() {
232 bs.write_byte(i, 0);
233 }
234 ExtendBuf { buffer: self, read }
235 }
236}
237
238#[cfg(feature = "tokio-02")]
239impl<R> CombineRead<R, dyn tokio_02_dep::io::AsyncRead> for Buffer
240where
241 R: tokio_02_dep::io::AsyncRead,
242{
243 fn poll_extend_buf(
244 &mut self,
245 cx: &mut Context<'_>,
246 read: Pin<&mut R>,
247 ) -> Poll<io::Result<usize>> {
248 if !self.0.has_remaining_mut() {
249 self.0.reserve(8 * 1024);
250 }
251 read.poll_read_buf(cx, &mut Bytes05(&mut self.0))
252 }
253}
254
255#[cfg(feature = "tokio-03")]
256fn tokio_03_to_read_buf(bs: &mut BytesMut) -> tokio_03_dep::io::ReadBuf<'_> {
257 let uninit = bs.chunk_mut();
258 unsafe {
259 tokio_03_dep::io::ReadBuf::uninit(std::slice::from_raw_parts_mut(
260 uninit.as_mut_ptr() as *mut MaybeUninit<u8>,
261 uninit.len(),
262 ))
263 }
264}
265
266#[cfg(feature = "tokio-03")]
267impl<R> CombineRead<R, dyn tokio_03_dep::io::AsyncRead> for Buffer
268where
269 R: tokio_03_dep::io::AsyncRead,
270{
271 fn poll_extend_buf(
272 &mut self,
273 cx: &mut Context<'_>,
274 read: Pin<&mut R>,
275 ) -> Poll<io::Result<usize>> {
276 tokio_03_read_buf(cx, read, &mut self.0)
277 }
278}
279
280#[cfg(feature = "tokio-03")]
281fn tokio_03_read_buf(
282 cx: &mut Context<'_>,
283 read: Pin<&mut impl tokio_03_dep::io::AsyncRead>,
284 bs: &mut bytes::BytesMut,
285) -> Poll<io::Result<usize>> {
286 if !bs.has_remaining_mut() {
287 bs.reserve(8 * 1024);
288 }
289
290 let mut buf = tokio_03_to_read_buf(bs);
291 ready!(read.poll_read(cx, &mut buf))?;
292 unsafe {
293 let n = buf.filled().len();
294 bs.advance_mut(n);
295 Poll::Ready(Ok(n))
296 }
297}
298
299#[cfg(feature = "tokio")]
300impl<R> CombineRead<R, dyn tokio_dep::io::AsyncRead> for Buffer
301where
302 R: tokio_dep::io::AsyncRead,
303{
304 fn poll_extend_buf(
305 &mut self,
306 cx: &mut Context<'_>,
307 read: Pin<&mut R>,
308 ) -> Poll<io::Result<usize>> {
309 tokio_read_buf(read, cx, &mut self.0)
310 }
311}
312
313#[cfg(feature = "tokio")]
314fn tokio_read_buf(
315 read: Pin<&mut impl tokio_dep::io::AsyncRead>,
316 cx: &mut Context<'_>,
317 bs: &mut bytes::BytesMut,
318) -> Poll<io::Result<usize>> {
319 if !bs.has_remaining_mut() {
320 bs.reserve(8 * 1024);
321 }
322
323 tokio_util::io::poll_read_buf(read, cx, bs)
324}
325
326#[derive(Default)]
328pub struct Bufferless;
329
330impl sealed::Sealed for Bufferless {}
331
332impl<R> CombineBuffer<BufReader<R>> for Bufferless {
333 fn buffer<'a>(&'a self, read: &'a BufReader<R>) -> &'a [u8] {
334 &read.buf
335 }
336
337 fn advance(&mut self, read: &mut BufReader<R>, len: usize) {
338 read.buf.advance(len);
339 }
340
341 #[cfg(feature = "pin-project-lite")]
342 fn advance_pin(&mut self, read: Pin<&mut BufReader<R>>, len: usize) {
343 read.project().buf.advance(len);
344 }
345}
346
347impl<R> CombineSyncRead<BufReader<R>> for Bufferless
348where
349 R: Read,
350{
351 fn extend_buf_sync(&mut self, read: &mut BufReader<R>) -> io::Result<usize> {
352 extend_buf_sync(&mut read.buf, &mut read.inner)
353 }
354}
355
356fn extend_buf_sync<R>(buf: &mut BytesMut, read: &mut R) -> io::Result<usize>
357where
358 R: Read,
359{
360 let size = 8 * 1024;
361 if !buf.has_remaining_mut() {
362 buf.reserve(size);
363 }
364
365 let n = {
367 let bs = buf.chunk_mut();
368
369 let initial_size = bs.len().min(size);
370 let bs = &mut bs[..initial_size];
371 for i in 0..bs.len() {
372 bs.write_byte(i, 0);
373 }
374
375 let bs = unsafe { &mut *(bs as *mut _ as *mut [u8]) };
378
379 let n = read.read(bs)?;
380 assert!(
381 n <= bs.len(),
382 "AsyncRead reported that it initialized more than the number of bytes in the buffer"
383 );
384 n
385 };
386
387 unsafe { buf.advance_mut(n) };
389
390 Ok(n)
391}
392
393#[cfg(feature = "tokio-02")]
394struct Bytes05<'a>(&'a mut BytesMut);
395
396#[cfg(feature = "tokio-02")]
397impl bytes_05::BufMut for Bytes05<'_> {
398 fn remaining_mut(&self) -> usize {
399 self.0.remaining_mut()
400 }
401 unsafe fn advance_mut(&mut self, cnt: usize) {
402 self.0.advance_mut(cnt)
403 }
404 fn bytes_mut(&mut self) -> &mut [MaybeUninit<u8>] {
405 unsafe { &mut *(self.0.chunk_mut() as *mut _ as *mut [MaybeUninit<u8>]) }
406 }
407}
408
409#[cfg(feature = "tokio-02")]
410impl<R> CombineRead<BufReader<R>, dyn tokio_02_dep::io::AsyncRead> for Bufferless
411where
412 R: tokio_02_dep::io::AsyncRead,
413{
414 fn poll_extend_buf(
415 &mut self,
416 cx: &mut Context<'_>,
417 read: Pin<&mut BufReader<R>>,
418 ) -> Poll<io::Result<usize>> {
419 let me = read.project();
420
421 if !me.buf.has_remaining_mut() {
422 me.buf.reserve(8 * 1024);
423 }
424 tokio_02_dep::io::AsyncRead::poll_read_buf(me.inner, cx, &mut Bytes05(me.buf))
425 }
426}
427
428#[cfg(feature = "tokio-03")]
429impl<R> CombineRead<BufReader<R>, dyn tokio_03_dep::io::AsyncRead> for Bufferless
430where
431 R: tokio_03_dep::io::AsyncRead,
432{
433 fn poll_extend_buf(
434 &mut self,
435 cx: &mut Context<'_>,
436 read: Pin<&mut BufReader<R>>,
437 ) -> Poll<io::Result<usize>> {
438 let me = read.project();
439
440 tokio_03_read_buf(cx, me.inner, me.buf)
441 }
442}
443
444#[cfg(feature = "tokio")]
445impl<R> CombineRead<BufReader<R>, dyn tokio_dep::io::AsyncRead> for Bufferless
446where
447 R: tokio_dep::io::AsyncRead,
448{
449 fn poll_extend_buf(
450 &mut self,
451 cx: &mut Context<'_>,
452 read: Pin<&mut BufReader<R>>,
453 ) -> Poll<io::Result<usize>> {
454 let me = read.project();
455
456 tokio_read_buf(me.inner, cx, me.buf)
457 }
458}
459
460#[cfg(feature = "futures-03")]
461impl<R> CombineAsyncRead<BufReader<R>> for Bufferless
462where
463 R: futures_io_03::AsyncRead,
464{
465 fn poll_extend_buf(
466 &mut self,
467 cx: &mut Context<'_>,
468 read: Pin<&mut BufReader<R>>,
469 ) -> Poll<io::Result<usize>> {
470 let me = read.project();
471
472 poll_extend_buf(me.buf, cx, me.inner)
473 }
474
475 fn extend_buf<'a>(
476 &'a mut self,
477 mut read: Pin<&'a mut BufReader<R>>,
478 ) -> ExtendBuf<'a, Self, BufReader<R>> {
479 let me = read.as_mut().project();
480
481 if !me.buf.has_remaining_mut() {
482 me.buf.reserve(8 * 1024);
483 }
484 let bs = me.buf.chunk_mut();
486
487 for i in 0..bs.len() {
488 bs.write_byte(i, 0);
489 }
490 ExtendBuf { buffer: self, read }
491 }
492}
493
494#[cfg(feature = "futures-03")]
495fn poll_extend_buf<R>(
496 buf: &mut BytesMut,
497 cx: &mut Context<'_>,
498 read: Pin<&mut R>,
499) -> Poll<io::Result<usize>>
500where
501 R: futures_io_03::AsyncRead,
502{
503 let n = {
505 let bs = buf.chunk_mut();
506 for i in 0..bs.len() {
508 bs.write_byte(i, 0);
509 }
510
511 let bs = unsafe { &mut *(bs as *mut _ as *mut [u8]) };
514
515 let n = ready!(read.poll_read(cx, bs))?;
516 assert!(
517 n <= bs.len(),
518 "AsyncRead reported that it initialized more than the number of bytes in the buffer"
519 );
520 n
521 };
522 unsafe { buf.advance_mut(n) };
524 Poll::Ready(Ok(n))
525}
526
527#[cfg(feature = "tokio-02")]
528impl<R: tokio_02_dep::io::AsyncRead> tokio_02_dep::io::AsyncRead for BufReader<R> {
529 fn poll_read(
530 mut self: Pin<&mut Self>,
531 cx: &mut Context<'_>,
532 buf: &mut [u8],
533 ) -> Poll<io::Result<usize>> {
534 use tokio_02_dep::io::AsyncBufRead;
535
536 if !self.buf.has_remaining_mut() && buf.len() >= self.buf.len() {
540 let res = ready!(self.as_mut().get_pin_mut().poll_read(cx, buf));
541 self.discard_buffer();
542 return Poll::Ready(res);
543 }
544 let mut rem = ready!(self.as_mut().poll_fill_buf(cx))?;
545 let nread = rem.read(buf)?;
546 self.consume(nread);
547 Poll::Ready(Ok(nread))
548 }
549
550 unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [MaybeUninit<u8>]) -> bool {
552 self.inner.prepare_uninitialized_buffer(buf)
553 }
554}
555
556#[cfg(feature = "tokio-02")]
557impl<R: tokio_02_dep::io::AsyncRead> tokio_02_dep::io::AsyncBufRead for BufReader<R> {
558 fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
559 let me = self.project();
560
561 if me.buf.is_empty() {
567 ready!(me.inner.poll_read_buf(cx, &mut Bytes05(me.buf)))?;
568 }
569 Poll::Ready(Ok(&me.buf[..]))
570 }
571
572 fn consume(self: Pin<&mut Self>, amt: usize) {
573 let me = self.project();
574 me.buf.advance(amt);
575 }
576}
577
578#[cfg(feature = "tokio-02")]
579impl<R: tokio_02_dep::io::AsyncRead + tokio_02_dep::io::AsyncWrite> tokio_02_dep::io::AsyncWrite
580 for BufReader<R>
581{
582 fn poll_write(
583 self: Pin<&mut Self>,
584 cx: &mut Context<'_>,
585 buf: &[u8],
586 ) -> Poll<io::Result<usize>> {
587 self.get_pin_mut().poll_write(cx, buf)
588 }
589
590 fn poll_write_buf<B: bytes_05::Buf>(
591 self: Pin<&mut Self>,
592 cx: &mut Context<'_>,
593 buf: &mut B,
594 ) -> Poll<io::Result<usize>> {
595 self.get_pin_mut().poll_write_buf(cx, buf)
596 }
597
598 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
599 self.get_pin_mut().poll_flush(cx)
600 }
601
602 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
603 self.get_pin_mut().poll_shutdown(cx)
604 }
605}
606
607#[cfg(feature = "tokio-03")]
608impl<R: tokio_03_dep::io::AsyncRead> tokio_03_dep::io::AsyncRead for BufReader<R> {
609 fn poll_read(
610 mut self: Pin<&mut Self>,
611 cx: &mut Context<'_>,
612 buf: &mut tokio_03_dep::io::ReadBuf<'_>,
613 ) -> Poll<io::Result<()>> {
614 if !self.buf.has_remaining_mut() && buf.remaining() >= self.buf.len() {
618 let res = ready!(self.as_mut().get_pin_mut().poll_read(cx, buf));
619 self.discard_buffer();
620 return Poll::Ready(res);
621 }
622 let rem = ready!(self.as_mut().poll_fill_buf(cx))?;
623 let amt = std::cmp::min(rem.len(), buf.remaining());
624 buf.put_slice(&rem[..amt]);
625 self.consume(amt);
626 Poll::Ready(Ok(()))
627 }
628}
629
630#[cfg(feature = "tokio-03")]
631impl<R: tokio_03_dep::io::AsyncRead> tokio_03_dep::io::AsyncBufRead for BufReader<R> {
632 fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
633 let me = self.project();
634
635 if me.buf.is_empty() {
638 ready!(tokio_03_read_buf(cx, me.inner, me.buf))?;
639 }
640 Poll::Ready(Ok(&me.buf[..]))
641 }
642
643 fn consume(self: Pin<&mut Self>, amt: usize) {
644 let me = self.project();
645 me.buf.advance(amt);
646 }
647}
648
649#[cfg(feature = "tokio-03")]
650impl<R: tokio_03_dep::io::AsyncRead + tokio_03_dep::io::AsyncWrite> tokio_03_dep::io::AsyncWrite
651 for BufReader<R>
652{
653 fn poll_write(
654 self: Pin<&mut Self>,
655 cx: &mut Context<'_>,
656 buf: &[u8],
657 ) -> Poll<io::Result<usize>> {
658 self.get_pin_mut().poll_write(cx, buf)
659 }
660
661 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
662 self.get_pin_mut().poll_flush(cx)
663 }
664
665 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
666 self.get_pin_mut().poll_shutdown(cx)
667 }
668}
669
670#[cfg(feature = "tokio")]
671impl<R: tokio_dep::io::AsyncRead> tokio_dep::io::AsyncRead for BufReader<R> {
672 fn poll_read(
673 mut self: Pin<&mut Self>,
674 cx: &mut Context<'_>,
675 buf: &mut tokio_dep::io::ReadBuf<'_>,
676 ) -> Poll<io::Result<()>> {
677 if !self.buf.has_remaining_mut() && buf.remaining() >= self.buf.len() {
681 let res = ready!(self.as_mut().get_pin_mut().poll_read(cx, buf));
682 self.discard_buffer();
683 return Poll::Ready(res);
684 }
685 let rem = ready!(self.as_mut().poll_fill_buf(cx))?;
686 let amt = std::cmp::min(rem.len(), buf.remaining());
687 buf.put_slice(&rem[..amt]);
688 self.consume(amt);
689 Poll::Ready(Ok(()))
690 }
691}
692
693#[cfg(feature = "tokio")]
694impl<R: tokio_dep::io::AsyncRead> tokio_dep::io::AsyncBufRead for BufReader<R> {
695 fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
696 let me = self.project();
697
698 if me.buf.is_empty() {
701 ready!(tokio_read_buf(me.inner, cx, me.buf))?;
702 }
703 Poll::Ready(Ok(&me.buf[..]))
704 }
705
706 fn consume(self: Pin<&mut Self>, amt: usize) {
707 let me = self.project();
708 me.buf.advance(amt);
709 }
710}
711
712#[cfg(feature = "tokio")]
713impl<R: tokio_dep::io::AsyncRead + tokio_dep::io::AsyncWrite> tokio_dep::io::AsyncWrite
714 for BufReader<R>
715{
716 fn poll_write(
717 self: Pin<&mut Self>,
718 cx: &mut Context<'_>,
719 buf: &[u8],
720 ) -> Poll<io::Result<usize>> {
721 self.get_pin_mut().poll_write(cx, buf)
722 }
723
724 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
725 self.get_pin_mut().poll_flush(cx)
726 }
727
728 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
729 self.get_pin_mut().poll_shutdown(cx)
730 }
731}
732
733impl<R: Read> Read for BufReader<R> {
734 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
735 if !self.buf.has_remaining_mut() && buf.len() >= self.buf.len() {
739 let res = self.read(buf);
740 self.buf.clear();
741 return res;
742 }
743 let nread = {
744 let mut rem = self.fill_buf()?;
745 rem.read(buf)?
746 };
747 self.consume(nread);
748 Ok(nread)
749 }
750}
751
752impl<R: Read> BufRead for BufReader<R> {
753 fn fill_buf(&mut self) -> io::Result<&[u8]> {
754 if self.buf.is_empty() {
760 Bufferless.extend_buf_sync(self)?;
761 }
762 Ok(&self.buf[..])
763 }
764
765 fn consume(&mut self, amt: usize) {
766 self.buf.advance(amt);
767 }
768}
769
770#[cfg(test)]
771#[cfg(feature = "tokio-02")]
772mod tests {
773 use super::{BufReader, Bufferless, CombineRead};
774
775 use std::{io, pin::Pin};
776
777 use {
778 bytes_05::BytesMut,
779 tokio_02_dep::{
780 self as tokio,
781 io::{AsyncRead, AsyncReadExt},
782 },
783 };
784
785 impl<R: AsyncRead> BufReader<R> {
786 async fn extend_buf_tokio_02(mut self: Pin<&mut Self>) -> io::Result<usize> {
787 crate::future_ext::poll_fn(|cx| Bufferless.poll_extend_buf(cx, self.as_mut())).await
788 }
789 }
790
791 #[tokio::test]
792 async fn buf_reader() {
793 let mut read = BufReader::with_capacity(3, &[1u8, 2, 3, 4, 5, 6, 7, 8, 9, 0][..]);
794
795 let mut buf = [0u8; 3];
796 read.read(&mut buf).await.unwrap();
797 assert_eq!(buf, [1, 2, 3]);
798
799 let mut buf = [0u8; 3];
800 read.read(&mut buf).await.unwrap();
801 assert_eq!(buf, [4, 5, 6]);
802
803 let mut buf = [0u8; 3];
804 read.read(&mut buf).await.unwrap();
805 assert_eq!(buf, [7, 8, 9]);
806
807 let mut buf = [1u8; 3];
808 read.read(&mut buf).await.unwrap();
809 assert_eq!(buf, [0, 1, 1]);
810 }
811
812 #[tokio::test]
813 async fn buf_reader_buf() {
814 let mut read = BufReader::with_capacity(3, &[1u8, 2, 3, 4, 5, 6, 7, 8, 9, 0][..]);
815
816 let mut buf = BytesMut::with_capacity(3);
817 read.read_buf(&mut buf).await.unwrap();
818 assert_eq!(&buf[..], [1, 2, 3]);
819
820 read.read_buf(&mut buf).await.unwrap();
821 assert_eq!(&buf[..], [1, 2, 3, 4, 5, 6, 7, 8, 9, 0]);
822 }
823
824 #[tokio::test]
825 async fn buf_reader_extend_buf() {
826 let read = BufReader::with_capacity(3, &[1u8, 2, 3, 4, 5, 6, 7, 8, 9, 0][..]);
827 futures_03_dep::pin_mut!(read);
828
829 assert_eq!(read.as_mut().extend_buf_tokio_02().await.unwrap(), 3);
830 assert_eq!(read.buffer(), [1, 2, 3]);
831
832 assert_eq!(read.as_mut().extend_buf_tokio_02().await.unwrap(), 7);
833 assert_eq!(read.buffer(), [1, 2, 3, 4, 5, 6, 7, 8, 9, 0]);
834 }
835}
836
837#[cfg(test)]
838#[cfg(feature = "tokio")]
839mod tests_tokio_1 {
840 use super::{BufReader, Bufferless, CombineRead};
841
842 use std::{io, pin::Pin};
843
844 use {
845 bytes::BytesMut,
846 tokio_dep::{
847 self as tokio,
848 io::{AsyncRead, AsyncReadExt},
849 },
850 };
851
852 impl<R: AsyncRead> BufReader<R> {
853 async fn extend_buf_tokio(mut self: Pin<&mut Self>) -> io::Result<usize> {
854 crate::future_ext::poll_fn(|cx| Bufferless.poll_extend_buf(cx, self.as_mut())).await
855 }
856 }
857
858 #[tokio::test]
859 async fn buf_reader() {
860 let mut read = BufReader::with_capacity(3, &[1u8, 2, 3, 4, 5, 6, 7, 8, 9, 0][..]);
861
862 let mut buf = [0u8; 3];
863 read.read(&mut buf).await.unwrap();
864 assert_eq!(buf, [1, 2, 3]);
865
866 let mut buf = [0u8; 3];
867 read.read(&mut buf).await.unwrap();
868 assert_eq!(buf, [4, 5, 6]);
869
870 let mut buf = [0u8; 3];
871 read.read(&mut buf).await.unwrap();
872 assert_eq!(buf, [7, 8, 9]);
873
874 let mut buf = [1u8; 3];
875 read.read(&mut buf).await.unwrap();
876 assert_eq!(buf, [0, 1, 1]);
877 }
878
879 #[tokio::test]
880 async fn buf_reader_buf() {
881 let mut read = BufReader::with_capacity(3, &[1u8, 2, 3, 4, 5, 6, 7, 8, 9, 0][..]);
882
883 let mut buf = BytesMut::with_capacity(3);
884 read.read_buf(&mut buf).await.unwrap();
885 assert_eq!(&buf[..], [1, 2, 3]);
886
887 read.read_buf(&mut buf).await.unwrap();
888 assert_eq!(&buf[..], [1, 2, 3, 4, 5, 6, 7, 8, 9, 0]);
889 }
890
891 #[tokio::test]
892 async fn buf_reader_extend_buf() {
893 let read = BufReader::with_capacity(3, &[1u8, 2, 3, 4, 5, 6, 7, 8, 9, 0][..]);
894 futures_03_dep::pin_mut!(read);
895
896 assert_eq!(read.as_mut().extend_buf_tokio().await.unwrap(), 3);
897 assert_eq!(read.buffer(), [1, 2, 3]);
898
899 assert_eq!(read.as_mut().extend_buf_tokio().await.unwrap(), 7);
900 assert_eq!(read.buffer(), [1, 2, 3, 4, 5, 6, 7, 8, 9, 0]);
901 }
902}
903
904#[cfg(test)]
905mod tests_sync {
906 use super::{BufReader, Bufferless, CombineSyncRead};
907
908 use std::io::Read;
909
910 #[test]
911 #[allow(clippy::unused_io_amount)]
912 fn buf_reader() {
913 let mut read = BufReader::with_capacity(3, &[1u8, 2, 3, 4, 5, 6, 7, 8, 9, 0][..]);
914
915 let mut buf = [0u8; 3];
916 read.read(&mut buf).unwrap();
917 assert_eq!(buf, [1, 2, 3]);
918
919 let mut buf = [0u8; 3];
920 read.read(&mut buf).unwrap();
921 assert_eq!(buf, [4, 5, 6]);
922
923 let mut buf = [0u8; 3];
924 read.read(&mut buf).unwrap();
925 assert_eq!(buf, [7, 8, 9]);
926
927 let mut buf = [1u8; 3];
928 read.read(&mut buf).unwrap();
929 assert_eq!(buf, [0, 1, 1]);
930 }
931
932 #[test]
933 fn buf_reader_extend_buf() {
934 let mut read = BufReader::with_capacity(3, &[1u8, 2, 3, 4, 5, 6, 7, 8, 9, 0][..]);
935
936 assert_eq!(Bufferless.extend_buf_sync(&mut read).unwrap(), 3);
937 assert_eq!(read.buffer(), [1, 2, 3]);
938
939 assert_eq!(Bufferless.extend_buf_sync(&mut read).unwrap(), 7);
940 assert_eq!(read.buffer(), [1, 2, 3, 4, 5, 6, 7, 8, 9, 0]);
941 }
942}