axum_core/
body.rs

1//! HTTP body utilities.
2
3use crate::{BoxError, Error};
4use bytes::Bytes;
5use futures_core::{Stream, TryStream};
6use http_body::{Body as _, Frame};
7use http_body_util::BodyExt;
8use pin_project_lite::pin_project;
9use std::pin::Pin;
10use std::task::{ready, Context, Poll};
11use sync_wrapper::SyncWrapper;
12
13type BoxBody = http_body_util::combinators::UnsyncBoxBody<Bytes, Error>;
14
15fn boxed<B>(body: B) -> BoxBody
16where
17    B: http_body::Body<Data = Bytes> + Send + 'static,
18    B::Error: Into<BoxError>,
19{
20    try_downcast(body).unwrap_or_else(|body| body.map_err(Error::new).boxed_unsync())
21}
22
23pub(crate) fn try_downcast<T, K>(k: K) -> Result<T, K>
24where
25    T: 'static,
26    K: Send + 'static,
27{
28    let mut k = Some(k);
29    if let Some(k) = <dyn std::any::Any>::downcast_mut::<Option<T>>(&mut k) {
30        Ok(k.take().unwrap())
31    } else {
32        Err(k.unwrap())
33    }
34}
35
36/// The body type used in axum requests and responses.
37#[derive(Debug)]
38pub struct Body(BoxBody);
39
40impl Body {
41    /// Create a new `Body` that wraps another [`http_body::Body`].
42    pub fn new<B>(body: B) -> Self
43    where
44        B: http_body::Body<Data = Bytes> + Send + 'static,
45        B::Error: Into<BoxError>,
46    {
47        try_downcast(body).unwrap_or_else(|body| Self(boxed(body)))
48    }
49
50    /// Create an empty body.
51    pub fn empty() -> Self {
52        Self::new(http_body_util::Empty::new())
53    }
54
55    /// Create a new `Body` from a [`Stream`].
56    ///
57    /// [`Stream`]: https://docs.rs/futures-core/latest/futures_core/stream/trait.Stream.html
58    pub fn from_stream<S>(stream: S) -> Self
59    where
60        S: TryStream + Send + 'static,
61        S::Ok: Into<Bytes>,
62        S::Error: Into<BoxError>,
63    {
64        Self::new(StreamBody {
65            stream: SyncWrapper::new(stream),
66        })
67    }
68
69    /// Convert the body into a [`Stream`] of data frames.
70    ///
71    /// Non-data frames (such as trailers) will be discarded. Use [`http_body_util::BodyStream`] if
72    /// you need a [`Stream`] of all frame types.
73    ///
74    /// [`http_body_util::BodyStream`]: https://docs.rs/http-body-util/latest/http_body_util/struct.BodyStream.html
75    pub fn into_data_stream(self) -> BodyDataStream {
76        BodyDataStream { inner: self }
77    }
78}
79
80impl Default for Body {
81    fn default() -> Self {
82        Self::empty()
83    }
84}
85
86impl From<()> for Body {
87    fn from(_: ()) -> Self {
88        Self::empty()
89    }
90}
91
92macro_rules! body_from_impl {
93    ($ty:ty) => {
94        impl From<$ty> for Body {
95            fn from(buf: $ty) -> Self {
96                Self::new(http_body_util::Full::from(buf))
97            }
98        }
99    };
100}
101
102body_from_impl!(&'static [u8]);
103body_from_impl!(std::borrow::Cow<'static, [u8]>);
104body_from_impl!(Vec<u8>);
105
106body_from_impl!(&'static str);
107body_from_impl!(std::borrow::Cow<'static, str>);
108body_from_impl!(String);
109
110body_from_impl!(Bytes);
111
112impl http_body::Body for Body {
113    type Data = Bytes;
114    type Error = Error;
115
116    #[inline]
117    fn poll_frame(
118        mut self: Pin<&mut Self>,
119        cx: &mut Context<'_>,
120    ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
121        Pin::new(&mut self.0).poll_frame(cx)
122    }
123
124    #[inline]
125    fn size_hint(&self) -> http_body::SizeHint {
126        self.0.size_hint()
127    }
128
129    #[inline]
130    fn is_end_stream(&self) -> bool {
131        self.0.is_end_stream()
132    }
133}
134
135/// A stream of data frames.
136///
137/// Created with [`Body::into_data_stream`].
138#[derive(Debug)]
139pub struct BodyDataStream {
140    inner: Body,
141}
142
143impl Stream for BodyDataStream {
144    type Item = Result<Bytes, Error>;
145
146    #[inline]
147    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
148        loop {
149            match ready!(Pin::new(&mut self.inner).poll_frame(cx)?) {
150                Some(frame) => match frame.into_data() {
151                    Ok(data) => return Poll::Ready(Some(Ok(data))),
152                    Err(_frame) => {}
153                },
154                None => return Poll::Ready(None),
155            }
156        }
157    }
158
159    #[inline]
160    fn size_hint(&self) -> (usize, Option<usize>) {
161        let size_hint = self.inner.size_hint();
162        let lower = usize::try_from(size_hint.lower()).unwrap_or_default();
163        let upper = size_hint.upper().and_then(|v| usize::try_from(v).ok());
164        (lower, upper)
165    }
166}
167
168impl http_body::Body for BodyDataStream {
169    type Data = Bytes;
170    type Error = Error;
171
172    #[inline]
173    fn poll_frame(
174        mut self: Pin<&mut Self>,
175        cx: &mut Context<'_>,
176    ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
177        Pin::new(&mut self.inner).poll_frame(cx)
178    }
179
180    #[inline]
181    fn is_end_stream(&self) -> bool {
182        self.inner.is_end_stream()
183    }
184
185    #[inline]
186    fn size_hint(&self) -> http_body::SizeHint {
187        self.inner.size_hint()
188    }
189}
190
191pin_project! {
192    struct StreamBody<S> {
193        #[pin]
194        stream: SyncWrapper<S>,
195    }
196}
197
198impl<S> http_body::Body for StreamBody<S>
199where
200    S: TryStream,
201    S::Ok: Into<Bytes>,
202    S::Error: Into<BoxError>,
203{
204    type Data = Bytes;
205    type Error = Error;
206
207    fn poll_frame(
208        self: Pin<&mut Self>,
209        cx: &mut Context<'_>,
210    ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
211        let stream = self.project().stream.get_pin_mut();
212        match ready!(stream.try_poll_next(cx)) {
213            Some(Ok(chunk)) => Poll::Ready(Some(Ok(Frame::data(chunk.into())))),
214            Some(Err(err)) => Poll::Ready(Some(Err(Error::new(err)))),
215            None => Poll::Ready(None),
216        }
217    }
218}
219
220#[test]
221fn test_try_downcast() {
222    assert_eq!(try_downcast::<i32, _>(5_u32), Err(5_u32));
223    assert_eq!(try_downcast::<i32, _>(5_i32), Ok(5_i32));
224}