Skip to main content

hyper/body/
incoming.rs

1use std::fmt;
2#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
3use std::future::Future;
4use std::pin::Pin;
5use std::task::{Context, Poll};
6
7use bytes::Bytes;
8#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
9use futures_channel::{mpsc, oneshot};
10#[cfg(all(
11    any(feature = "http1", feature = "http2"),
12    any(feature = "client", feature = "server")
13))]
14use futures_core::ready;
15#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
16use futures_core::{stream::FusedStream, Stream}; // for mpsc::Receiver
17#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
18use http::HeaderMap;
19use http_body::{Body, Frame, SizeHint};
20
21#[cfg(all(
22    any(feature = "http1", feature = "http2"),
23    any(feature = "client", feature = "server")
24))]
25use super::DecodedLength;
26#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
27use crate::common::watch;
28#[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
29use crate::proto::h2::ping;
30
31#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
32type BodySender = mpsc::Sender<Result<Bytes, crate::Error>>;
33#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
34type TrailersSender = oneshot::Sender<HeaderMap>;
35
36/// A stream of `Bytes`, used when receiving bodies from the network.
37///
38/// Note that Users should not instantiate this struct directly. When working with the hyper client,
39/// `Incoming` is returned to you in responses. Similarly, when operating with the hyper server,
40/// it is provided within requests.
41///
42/// # Examples
43///
44/// ```rust,ignore
45/// async fn echo(
46///    req: Request<hyper::body::Incoming>,
47/// ) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
48///    //Here, you can process `Incoming`
49/// }
50/// ```
51#[must_use = "streams do nothing unless polled"]
52pub struct Incoming {
53    kind: Kind,
54}
55
56enum Kind {
57    Empty,
58    #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
59    Chan {
60        content_length: DecodedLength,
61        want_tx: watch::Sender,
62        data_rx: mpsc::Receiver<Result<Bytes, crate::Error>>,
63        trailers_rx: oneshot::Receiver<HeaderMap>,
64    },
65    #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
66    H2 {
67        content_length: DecodedLength,
68        data_done: bool,
69        ping: ping::Recorder,
70        recv: h2::RecvStream,
71    },
72    #[cfg(feature = "ffi")]
73    Ffi(crate::ffi::UserBody),
74}
75
76/// A sender half created through [`Body::channel()`].
77///
78/// Useful when wanting to stream chunks from another thread.
79///
80/// ## Body Closing
81///
82/// Note that the request body will always be closed normally when the sender is dropped (meaning
83/// that the empty terminating chunk will be sent to the remote). If you desire to close the
84/// connection with an incomplete response (e.g. in the case of an error during asynchronous
85/// processing), call the [`Sender::abort()`] method to abort the body in an abnormal fashion.
86///
87/// [`Body::channel()`]: struct.Body.html#method.channel
88/// [`Sender::abort()`]: struct.Sender.html#method.abort
89#[must_use = "Sender does nothing unless sent on"]
90#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
91pub(crate) struct Sender {
92    want_rx: watch::Receiver,
93    data_tx: BodySender,
94    trailers_tx: Option<TrailersSender>,
95}
96
97#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
98const WANT_PENDING: usize = 1;
99#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
100const WANT_READY: usize = 2;
101
102impl Incoming {
103    /// Create a `Body` stream with an associated sender half.
104    ///
105    /// Useful when wanting to stream chunks from another thread.
106    #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
107    #[inline]
108    #[cfg(test)]
109    pub(crate) fn channel() -> (Sender, Incoming) {
110        Self::new_channel(DecodedLength::CHUNKED, /*wanter =*/ false)
111    }
112
113    #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
114    pub(crate) fn new_channel(content_length: DecodedLength, wanter: bool) -> (Sender, Incoming) {
115        let (data_tx, data_rx) = mpsc::channel(0);
116        let (trailers_tx, trailers_rx) = oneshot::channel();
117
118        // If wanter is true, `Sender::poll_ready()` won't becoming ready
119        // until the `Body` has been polled for data once.
120        let want = if wanter { WANT_PENDING } else { WANT_READY };
121
122        let (want_tx, want_rx) = watch::channel(want);
123
124        let tx = Sender {
125            want_rx,
126            data_tx,
127            trailers_tx: Some(trailers_tx),
128        };
129        let rx = Incoming::new(Kind::Chan {
130            content_length,
131            want_tx,
132            data_rx,
133            trailers_rx,
134        });
135
136        (tx, rx)
137    }
138
139    fn new(kind: Kind) -> Incoming {
140        Incoming { kind }
141    }
142
143    #[allow(dead_code)]
144    pub(crate) fn empty() -> Incoming {
145        Incoming::new(Kind::Empty)
146    }
147
148    #[cfg(feature = "ffi")]
149    pub(crate) fn ffi() -> Incoming {
150        Incoming::new(Kind::Ffi(crate::ffi::UserBody::new()))
151    }
152
153    #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
154    pub(crate) fn h2(
155        recv: h2::RecvStream,
156        mut content_length: DecodedLength,
157        ping: ping::Recorder,
158    ) -> Self {
159        // If the stream is already EOS, then the "unknown length" is clearly
160        // actually ZERO.
161        if !content_length.is_exact() && recv.is_end_stream() {
162            content_length = DecodedLength::ZERO;
163        }
164
165        Incoming::new(Kind::H2 {
166            data_done: false,
167            ping,
168            content_length,
169            recv,
170        })
171    }
172
173    #[cfg(feature = "ffi")]
174    pub(crate) fn as_ffi_mut(&mut self) -> &mut crate::ffi::UserBody {
175        match self.kind {
176            Kind::Ffi(ref mut body) => return body,
177            _ => {
178                self.kind = Kind::Ffi(crate::ffi::UserBody::new());
179            }
180        }
181
182        match self.kind {
183            Kind::Ffi(ref mut body) => body,
184            _ => unreachable!(),
185        }
186    }
187}
188
189impl Body for Incoming {
190    type Data = Bytes;
191    type Error = crate::Error;
192
193    fn poll_frame(
194        #[cfg_attr(
195            not(all(
196                any(feature = "http1", feature = "http2"),
197                any(feature = "client", feature = "server")
198            )),
199            allow(unused_mut)
200        )]
201        mut self: Pin<&mut Self>,
202        #[cfg_attr(
203            not(all(
204                any(feature = "http1", feature = "http2"),
205                any(feature = "client", feature = "server")
206            )),
207            allow(unused_variables)
208        )]
209        cx: &mut Context<'_>,
210    ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
211        match self.kind {
212            Kind::Empty => Poll::Ready(None),
213            #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
214            Kind::Chan {
215                content_length: ref mut len,
216                ref mut data_rx,
217                ref mut want_tx,
218                ref mut trailers_rx,
219            } => {
220                want_tx.send(WANT_READY);
221
222                if !data_rx.is_terminated() {
223                    if let Some(chunk) = ready!(Pin::new(data_rx).poll_next(cx)?) {
224                        len.sub_if(chunk.len() as u64);
225                        return Poll::Ready(Some(Ok(Frame::data(chunk))));
226                    }
227                }
228
229                // check trailers after data is terminated
230                match ready!(Pin::new(trailers_rx).poll(cx)) {
231                    Ok(t) => Poll::Ready(Some(Ok(Frame::trailers(t)))),
232                    Err(_) => Poll::Ready(None),
233                }
234            }
235            #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
236            Kind::H2 {
237                ref mut data_done,
238                ref ping,
239                recv: ref mut h2,
240                content_length: ref mut len,
241            } => {
242                if !*data_done {
243                    match ready!(h2.poll_data(cx)) {
244                        Some(Ok(bytes)) => {
245                            let _ = h2.flow_control().release_capacity(bytes.len());
246                            len.sub_if(bytes.len() as u64);
247                            ping.record_data(bytes.len());
248                            return Poll::Ready(Some(Ok(Frame::data(bytes))));
249                        }
250                        Some(Err(e)) => {
251                            return match e.reason() {
252                                // These reasons should cause the body reading to stop, but not fail it.
253                                // The same logic as for `Read for H2Upgraded` is applied here.
254                                Some(h2::Reason::NO_ERROR) | Some(h2::Reason::CANCEL) => {
255                                    Poll::Ready(None)
256                                }
257                                _ => Poll::Ready(Some(Err(crate::Error::new_body(e)))),
258                            };
259                        }
260                        None => {
261                            *data_done = true;
262                            // fall through to trailers
263                        }
264                    }
265                }
266
267                // after data, check trailers
268                match ready!(h2.poll_trailers(cx)) {
269                    Ok(t) => {
270                        ping.record_non_data();
271                        Poll::Ready(Ok(t.map(Frame::trailers)).transpose())
272                    }
273                    Err(e) => Poll::Ready(Some(Err(crate::Error::new_h2(e)))),
274                }
275            }
276
277            #[cfg(feature = "ffi")]
278            Kind::Ffi(ref mut body) => body.poll_data(cx),
279        }
280    }
281
282    fn is_end_stream(&self) -> bool {
283        match self.kind {
284            Kind::Empty => true,
285            #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
286            Kind::Chan { content_length, .. } => content_length == DecodedLength::ZERO,
287            #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
288            Kind::H2 { recv: ref h2, .. } => h2.is_end_stream(),
289            #[cfg(feature = "ffi")]
290            Kind::Ffi(..) => false,
291        }
292    }
293
294    fn size_hint(&self) -> SizeHint {
295        #[cfg(all(
296            any(feature = "http1", feature = "http2"),
297            any(feature = "client", feature = "server")
298        ))]
299        fn opt_len(decoded_length: DecodedLength) -> SizeHint {
300            if let Some(content_length) = decoded_length.into_opt() {
301                SizeHint::with_exact(content_length)
302            } else {
303                SizeHint::default()
304            }
305        }
306
307        match self.kind {
308            Kind::Empty => SizeHint::with_exact(0),
309            #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
310            Kind::Chan { content_length, .. } => opt_len(content_length),
311            #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
312            Kind::H2 { content_length, .. } => opt_len(content_length),
313            #[cfg(feature = "ffi")]
314            Kind::Ffi(..) => SizeHint::default(),
315        }
316    }
317}
318
319impl fmt::Debug for Incoming {
320    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
321        #[cfg(any(
322            all(
323                any(feature = "http1", feature = "http2"),
324                any(feature = "client", feature = "server")
325            ),
326            feature = "ffi"
327        ))]
328        #[derive(Debug)]
329        struct Streaming;
330        #[derive(Debug)]
331        struct Empty;
332
333        let mut builder = f.debug_tuple("Body");
334        match self.kind {
335            Kind::Empty => builder.field(&Empty),
336            #[cfg(any(
337                all(
338                    any(feature = "http1", feature = "http2"),
339                    any(feature = "client", feature = "server")
340                ),
341                feature = "ffi"
342            ))]
343            _ => builder.field(&Streaming),
344        };
345
346        builder.finish()
347    }
348}
349
350#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
351impl Sender {
352    /// Check to see if this `Sender` can send more data.
353    pub(crate) fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
354        // Check if the receiver end has tried polling for the body yet
355        ready!(self.poll_want(cx)?);
356        self.data_tx
357            .poll_ready(cx)
358            .map_err(|_| crate::Error::new_closed())
359    }
360
361    fn poll_want(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
362        match self.want_rx.load(cx) {
363            WANT_READY => Poll::Ready(Ok(())),
364            WANT_PENDING => Poll::Pending,
365            watch::CLOSED => Poll::Ready(Err(crate::Error::new_closed())),
366            unexpected => unreachable!("want_rx value: {}", unexpected),
367        }
368    }
369
370    #[cfg(test)]
371    async fn ready(&mut self) -> crate::Result<()> {
372        futures_util::future::poll_fn(|cx| self.poll_ready(cx)).await
373    }
374
375    /// Send data on data channel when it is ready.
376    #[cfg(test)]
377    #[allow(unused)]
378    pub(crate) async fn send_data(&mut self, chunk: Bytes) -> crate::Result<()> {
379        self.ready().await?;
380        self.data_tx
381            .try_send(Ok(chunk))
382            .map_err(|_| crate::Error::new_closed())
383    }
384
385    /// Send trailers on trailers channel.
386    #[allow(unused)]
387    pub(crate) async fn send_trailers(&mut self, trailers: HeaderMap) -> crate::Result<()> {
388        let tx = match self.trailers_tx.take() {
389            Some(tx) => tx,
390            None => return Err(crate::Error::new_closed()),
391        };
392        tx.send(trailers).map_err(|_| crate::Error::new_closed())
393    }
394
395    /// Try to send data on this channel.
396    ///
397    /// # Errors
398    ///
399    /// Returns `Err(Bytes)` if the channel could not (currently) accept
400    /// another `Bytes`.
401    ///
402    /// # Note
403    ///
404    /// This is mostly useful for when trying to send from some other thread
405    /// that doesn't have an async context. If in an async context, prefer
406    /// `send_data()` instead.
407    #[cfg(feature = "http1")]
408    pub(crate) fn try_send_data(&mut self, chunk: Bytes) -> Result<(), Bytes> {
409        self.data_tx
410            .try_send(Ok(chunk))
411            .map_err(|err| err.into_inner().expect("just sent Ok"))
412    }
413
414    #[cfg(feature = "http1")]
415    pub(crate) fn try_send_trailers(
416        &mut self,
417        trailers: HeaderMap,
418    ) -> Result<(), Option<HeaderMap>> {
419        let tx = match self.trailers_tx.take() {
420            Some(tx) => tx,
421            None => return Err(None),
422        };
423
424        tx.send(trailers).map_err(Some)
425    }
426
427    #[cfg(test)]
428    pub(crate) fn abort(mut self) {
429        self.send_error(crate::Error::new_body_write_aborted());
430    }
431
432    pub(crate) fn send_error(&mut self, err: crate::Error) {
433        let _ = self
434            .data_tx
435            // clone so the send works even if buffer is full
436            .clone()
437            .try_send(Err(err));
438    }
439}
440
441#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
442impl fmt::Debug for Sender {
443    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
444        #[derive(Debug)]
445        struct Open;
446        #[derive(Debug)]
447        struct Closed;
448
449        let mut builder = f.debug_tuple("Sender");
450        match self.want_rx.peek() {
451            watch::CLOSED => builder.field(&Closed),
452            _ => builder.field(&Open),
453        };
454
455        builder.finish()
456    }
457}
458
459#[cfg(test)]
460mod tests {
461    #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
462    use std::mem;
463    #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
464    use std::task::Poll;
465
466    #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
467    use super::{Body, Incoming, SizeHint};
468    #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
469    use super::{DecodedLength, Sender};
470    #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
471    use http_body_util::BodyExt;
472
473    #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
474    #[test]
475    fn test_size_of() {
476        // These are mostly to help catch *accidentally* increasing
477        // the size by too much.
478
479        let body_size = mem::size_of::<Incoming>();
480        let body_expected_size = mem::size_of::<u64>() * 5;
481        assert!(
482            body_size <= body_expected_size,
483            "Body size = {} <= {}",
484            body_size,
485            body_expected_size,
486        );
487
488        //assert_eq!(body_size, mem::size_of::<Option<Incoming>>(), "Option<Incoming>");
489
490        assert_eq!(
491            mem::size_of::<Sender>(),
492            mem::size_of::<usize>() * 5,
493            "Sender"
494        );
495
496        assert_eq!(
497            mem::size_of::<Sender>(),
498            mem::size_of::<Option<Sender>>(),
499            "Option<Sender>"
500        );
501    }
502
503    #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
504    #[test]
505    fn size_hint() {
506        fn eq(body: Incoming, b: SizeHint, note: &str) {
507            let a = body.size_hint();
508            assert_eq!(a.lower(), b.lower(), "lower for {:?}", note);
509            assert_eq!(a.upper(), b.upper(), "upper for {:?}", note);
510        }
511
512        eq(Incoming::empty(), SizeHint::with_exact(0), "empty");
513
514        eq(Incoming::channel().1, SizeHint::new(), "channel");
515
516        eq(
517            Incoming::new_channel(DecodedLength::new(4), /*wanter =*/ false).1,
518            SizeHint::with_exact(4),
519            "channel with length",
520        );
521    }
522
523    #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
524    #[cfg(not(miri))]
525    #[tokio::test]
526    async fn channel_abort() {
527        let (tx, mut rx) = Incoming::channel();
528
529        tx.abort();
530
531        let err = rx.frame().await.unwrap().unwrap_err();
532        assert!(err.is_body_write_aborted(), "{:?}", err);
533    }
534
535    #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
536    #[cfg(all(not(miri), feature = "http1"))]
537    #[tokio::test]
538    async fn channel_abort_when_buffer_is_full() {
539        let (mut tx, mut rx) = Incoming::channel();
540
541        tx.try_send_data("chunk 1".into()).expect("send 1");
542        // buffer is full, but can still send abort
543        tx.abort();
544
545        let chunk1 = rx
546            .frame()
547            .await
548            .expect("item 1")
549            .expect("chunk 1")
550            .into_data()
551            .unwrap();
552        assert_eq!(chunk1, "chunk 1");
553
554        let err = rx.frame().await.unwrap().unwrap_err();
555        assert!(err.is_body_write_aborted(), "{:?}", err);
556    }
557
558    #[cfg(feature = "http1")]
559    #[test]
560    fn channel_buffers_one() {
561        let (mut tx, _rx) = Incoming::channel();
562
563        tx.try_send_data("chunk 1".into()).expect("send 1");
564
565        // buffer is now full
566        let chunk2 = tx.try_send_data("chunk 2".into()).expect_err("send 2");
567        assert_eq!(chunk2, "chunk 2");
568    }
569
570    #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
571    #[cfg(not(miri))]
572    #[tokio::test]
573    async fn channel_empty() {
574        let (_, mut rx) = Incoming::channel();
575
576        assert!(rx.frame().await.is_none());
577    }
578
579    #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
580    #[test]
581    fn channel_ready() {
582        let (mut tx, _rx) = Incoming::new_channel(DecodedLength::CHUNKED, /*wanter = */ false);
583
584        let mut tx_ready = tokio_test::task::spawn(tx.ready());
585
586        assert!(tx_ready.poll().is_ready(), "tx is ready immediately");
587    }
588
589    #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
590    #[test]
591    fn channel_wanter() {
592        let (mut tx, mut rx) =
593            Incoming::new_channel(DecodedLength::CHUNKED, /*wanter = */ true);
594
595        let mut tx_ready = tokio_test::task::spawn(tx.ready());
596        let mut rx_data = tokio_test::task::spawn(rx.frame());
597
598        assert!(
599            tx_ready.poll().is_pending(),
600            "tx isn't ready before rx has been polled"
601        );
602
603        assert!(rx_data.poll().is_pending(), "poll rx.data");
604        assert!(tx_ready.is_woken(), "rx poll wakes tx");
605
606        assert!(
607            tx_ready.poll().is_ready(),
608            "tx is ready after rx has been polled"
609        );
610    }
611
612    #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
613    #[test]
614    fn channel_notices_closure() {
615        let (mut tx, rx) = Incoming::new_channel(DecodedLength::CHUNKED, /*wanter = */ true);
616
617        let mut tx_ready = tokio_test::task::spawn(tx.ready());
618
619        assert!(
620            tx_ready.poll().is_pending(),
621            "tx isn't ready before rx has been polled"
622        );
623
624        drop(rx);
625        assert!(tx_ready.is_woken(), "dropping rx wakes tx");
626
627        match tx_ready.poll() {
628            Poll::Ready(Err(ref e)) if e.is_closed() => (),
629            unexpected => panic!("tx poll ready unexpected: {:?}", unexpected),
630        }
631    }
632}