axum_core/
body.rs

1//! HTTP body utilities.
2
3use crate::{BoxError, Error};
4use bytes::Bytes;
5use futures_core::{Stream, TryStream};
6use http_body::{Body as _, Frame};
7use http_body_util::BodyExt;
8use pin_project_lite::pin_project;
9use std::pin::Pin;
10use std::task::{ready, Context, Poll};
11use sync_wrapper::SyncWrapper;
12
13type BoxBody = http_body_util::combinators::UnsyncBoxBody<Bytes, Error>;
14
15fn boxed<B>(body: B) -> BoxBody
16where
17    B: http_body::Body<Data = Bytes> + Send + 'static,
18    B::Error: Into<BoxError>,
19{
20    try_downcast(body).unwrap_or_else(|body| body.map_err(Error::new).boxed_unsync())
21}
22
23pub(crate) fn try_downcast<T, K>(k: K) -> Result<T, K>
24where
25    T: 'static,
26    K: Send + 'static,
27{
28    let mut k = Some(k);
29    if let Some(k) = <dyn std::any::Any>::downcast_mut::<Option<T>>(&mut k) {
30        Ok(k.take().unwrap())
31    } else {
32        Err(k.unwrap())
33    }
34}
35
36/// The body type used in axum requests and responses.
37#[must_use]
38#[derive(Debug)]
39pub struct Body(BoxBody);
40
41impl Body {
42    /// Create a new `Body` that wraps another [`http_body::Body`].
43    pub fn new<B>(body: B) -> Self
44    where
45        B: http_body::Body<Data = Bytes> + Send + 'static,
46        B::Error: Into<BoxError>,
47    {
48        try_downcast(body).unwrap_or_else(|body| Self(boxed(body)))
49    }
50
51    /// Create an empty body.
52    pub fn empty() -> Self {
53        Self::new(http_body_util::Empty::new())
54    }
55
56    /// Create a new `Body` from a [`Stream`].
57    ///
58    /// [`Stream`]: https://docs.rs/futures-core/latest/futures_core/stream/trait.Stream.html
59    pub fn from_stream<S>(stream: S) -> Self
60    where
61        S: TryStream + Send + 'static,
62        S::Ok: Into<Bytes>,
63        S::Error: Into<BoxError>,
64    {
65        Self::new(StreamBody {
66            stream: SyncWrapper::new(stream),
67        })
68    }
69
70    /// Convert the body into a [`Stream`] of data frames.
71    ///
72    /// Non-data frames (such as trailers) will be discarded. Use [`http_body_util::BodyStream`] if
73    /// you need a [`Stream`] of all frame types.
74    ///
75    /// [`http_body_util::BodyStream`]: https://docs.rs/http-body-util/latest/http_body_util/struct.BodyStream.html
76    pub fn into_data_stream(self) -> BodyDataStream {
77        BodyDataStream { inner: self }
78    }
79}
80
81impl Default for Body {
82    fn default() -> Self {
83        Self::empty()
84    }
85}
86
87impl From<()> for Body {
88    fn from(_: ()) -> Self {
89        Self::empty()
90    }
91}
92
93macro_rules! body_from_impl {
94    ($ty:ty) => {
95        impl From<$ty> for Body {
96            fn from(buf: $ty) -> Self {
97                Self::new(http_body_util::Full::from(buf))
98            }
99        }
100    };
101}
102
103body_from_impl!(&'static [u8]);
104body_from_impl!(std::borrow::Cow<'static, [u8]>);
105body_from_impl!(Vec<u8>);
106
107body_from_impl!(&'static str);
108body_from_impl!(std::borrow::Cow<'static, str>);
109body_from_impl!(String);
110
111body_from_impl!(Bytes);
112
113impl http_body::Body for Body {
114    type Data = Bytes;
115    type Error = Error;
116
117    #[inline]
118    fn poll_frame(
119        mut self: Pin<&mut Self>,
120        cx: &mut Context<'_>,
121    ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
122        Pin::new(&mut self.0).poll_frame(cx)
123    }
124
125    #[inline]
126    fn size_hint(&self) -> http_body::SizeHint {
127        self.0.size_hint()
128    }
129
130    #[inline]
131    fn is_end_stream(&self) -> bool {
132        self.0.is_end_stream()
133    }
134}
135
136/// A stream of data frames.
137///
138/// Created with [`Body::into_data_stream`].
139#[must_use]
140#[derive(Debug)]
141pub struct BodyDataStream {
142    inner: Body,
143}
144
145impl Stream for BodyDataStream {
146    type Item = Result<Bytes, Error>;
147
148    #[inline]
149    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
150        loop {
151            match ready!(Pin::new(&mut self.inner).poll_frame(cx)?) {
152                Some(frame) => match frame.into_data() {
153                    Ok(data) => return Poll::Ready(Some(Ok(data))),
154                    Err(_frame) => {}
155                },
156                None => return Poll::Ready(None),
157            }
158        }
159    }
160
161    #[inline]
162    fn size_hint(&self) -> (usize, Option<usize>) {
163        let size_hint = self.inner.size_hint();
164        let lower = usize::try_from(size_hint.lower()).unwrap_or_default();
165        let upper = size_hint.upper().and_then(|v| usize::try_from(v).ok());
166        (lower, upper)
167    }
168}
169
170impl http_body::Body for BodyDataStream {
171    type Data = Bytes;
172    type Error = Error;
173
174    #[inline]
175    fn poll_frame(
176        mut self: Pin<&mut Self>,
177        cx: &mut Context<'_>,
178    ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
179        Pin::new(&mut self.inner).poll_frame(cx)
180    }
181
182    #[inline]
183    fn is_end_stream(&self) -> bool {
184        self.inner.is_end_stream()
185    }
186
187    #[inline]
188    fn size_hint(&self) -> http_body::SizeHint {
189        self.inner.size_hint()
190    }
191}
192
193pin_project! {
194    struct StreamBody<S> {
195        #[pin]
196        stream: SyncWrapper<S>,
197    }
198}
199
200impl<S> http_body::Body for StreamBody<S>
201where
202    S: TryStream,
203    S::Ok: Into<Bytes>,
204    S::Error: Into<BoxError>,
205{
206    type Data = Bytes;
207    type Error = Error;
208
209    fn poll_frame(
210        self: Pin<&mut Self>,
211        cx: &mut Context<'_>,
212    ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
213        let stream = self.project().stream.get_pin_mut();
214        match ready!(stream.try_poll_next(cx)) {
215            Some(Ok(chunk)) => Poll::Ready(Some(Ok(Frame::data(chunk.into())))),
216            Some(Err(err)) => Poll::Ready(Some(Err(Error::new(err)))),
217            None => Poll::Ready(None),
218        }
219    }
220}
221
222#[test]
223fn test_try_downcast() {
224    assert_eq!(try_downcast::<i32, _>(5_u32), Err(5_u32));
225    assert_eq!(try_downcast::<i32, _>(5_i32), Ok(5_i32));
226}