1use std::fmt;
2#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
3use std::future::Future;
4use std::pin::Pin;
5use std::task::{Context, Poll};
6
7use bytes::Bytes;
8#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
9use futures_channel::{mpsc, oneshot};
10#[cfg(all(
11 any(feature = "http1", feature = "http2"),
12 any(feature = "client", feature = "server")
13))]
14use futures_core::ready;
15#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
16use futures_core::{stream::FusedStream, Stream}; #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
18use http::HeaderMap;
19use http_body::{Body, Frame, SizeHint};
20
21#[cfg(all(
22 any(feature = "http1", feature = "http2"),
23 any(feature = "client", feature = "server")
24))]
25use super::DecodedLength;
26#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
27use crate::common::watch;
28#[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
29use crate::proto::h2::ping;
30
31#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
32type BodySender = mpsc::Sender<Result<Bytes, crate::Error>>;
33#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
34type TrailersSender = oneshot::Sender<HeaderMap>;
35
36#[must_use = "streams do nothing unless polled"]
52pub struct Incoming {
53 kind: Kind,
54}
55
56enum Kind {
57 Empty,
58 #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
59 Chan {
60 content_length: DecodedLength,
61 want_tx: watch::Sender,
62 data_rx: mpsc::Receiver<Result<Bytes, crate::Error>>,
63 trailers_rx: oneshot::Receiver<HeaderMap>,
64 },
65 #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
66 H2 {
67 content_length: DecodedLength,
68 data_done: bool,
69 ping: ping::Recorder,
70 recv: h2::RecvStream,
71 },
72 #[cfg(feature = "ffi")]
73 Ffi(crate::ffi::UserBody),
74}
75
76#[must_use = "Sender does nothing unless sent on"]
90#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
91pub(crate) struct Sender {
92 want_rx: watch::Receiver,
93 data_tx: BodySender,
94 trailers_tx: Option<TrailersSender>,
95}
96
97#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
98const WANT_PENDING: usize = 1;
99#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
100const WANT_READY: usize = 2;
101
102impl Incoming {
103 #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
107 #[inline]
108 #[cfg(test)]
109 pub(crate) fn channel() -> (Sender, Incoming) {
110 Self::new_channel(DecodedLength::CHUNKED, false)
111 }
112
113 #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
114 pub(crate) fn new_channel(content_length: DecodedLength, wanter: bool) -> (Sender, Incoming) {
115 let (data_tx, data_rx) = mpsc::channel(0);
116 let (trailers_tx, trailers_rx) = oneshot::channel();
117
118 let want = if wanter { WANT_PENDING } else { WANT_READY };
121
122 let (want_tx, want_rx) = watch::channel(want);
123
124 let tx = Sender {
125 want_rx,
126 data_tx,
127 trailers_tx: Some(trailers_tx),
128 };
129 let rx = Incoming::new(Kind::Chan {
130 content_length,
131 want_tx,
132 data_rx,
133 trailers_rx,
134 });
135
136 (tx, rx)
137 }
138
139 fn new(kind: Kind) -> Incoming {
140 Incoming { kind }
141 }
142
143 #[allow(dead_code)]
144 pub(crate) fn empty() -> Incoming {
145 Incoming::new(Kind::Empty)
146 }
147
148 #[cfg(feature = "ffi")]
149 pub(crate) fn ffi() -> Incoming {
150 Incoming::new(Kind::Ffi(crate::ffi::UserBody::new()))
151 }
152
153 #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
154 pub(crate) fn h2(
155 recv: h2::RecvStream,
156 mut content_length: DecodedLength,
157 ping: ping::Recorder,
158 ) -> Self {
159 if !content_length.is_exact() && recv.is_end_stream() {
162 content_length = DecodedLength::ZERO;
163 }
164
165 Incoming::new(Kind::H2 {
166 data_done: false,
167 ping,
168 content_length,
169 recv,
170 })
171 }
172
173 #[cfg(feature = "ffi")]
174 pub(crate) fn as_ffi_mut(&mut self) -> &mut crate::ffi::UserBody {
175 match self.kind {
176 Kind::Ffi(ref mut body) => return body,
177 _ => {
178 self.kind = Kind::Ffi(crate::ffi::UserBody::new());
179 }
180 }
181
182 match self.kind {
183 Kind::Ffi(ref mut body) => body,
184 _ => unreachable!(),
185 }
186 }
187}
188
189impl Body for Incoming {
190 type Data = Bytes;
191 type Error = crate::Error;
192
193 fn poll_frame(
194 #[cfg_attr(
195 not(all(
196 any(feature = "http1", feature = "http2"),
197 any(feature = "client", feature = "server")
198 )),
199 allow(unused_mut)
200 )]
201 mut self: Pin<&mut Self>,
202 #[cfg_attr(
203 not(all(
204 any(feature = "http1", feature = "http2"),
205 any(feature = "client", feature = "server")
206 )),
207 allow(unused_variables)
208 )]
209 cx: &mut Context<'_>,
210 ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
211 match self.kind {
212 Kind::Empty => Poll::Ready(None),
213 #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
214 Kind::Chan {
215 content_length: ref mut len,
216 ref mut data_rx,
217 ref mut want_tx,
218 ref mut trailers_rx,
219 } => {
220 want_tx.send(WANT_READY);
221
222 if !data_rx.is_terminated() {
223 if let Some(chunk) = ready!(Pin::new(data_rx).poll_next(cx)?) {
224 len.sub_if(chunk.len() as u64);
225 return Poll::Ready(Some(Ok(Frame::data(chunk))));
226 }
227 }
228
229 match ready!(Pin::new(trailers_rx).poll(cx)) {
231 Ok(t) => Poll::Ready(Some(Ok(Frame::trailers(t)))),
232 Err(_) => Poll::Ready(None),
233 }
234 }
235 #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
236 Kind::H2 {
237 ref mut data_done,
238 ref ping,
239 recv: ref mut h2,
240 content_length: ref mut len,
241 } => {
242 if !*data_done {
243 match ready!(h2.poll_data(cx)) {
244 Some(Ok(bytes)) => {
245 let _ = h2.flow_control().release_capacity(bytes.len());
246 len.sub_if(bytes.len() as u64);
247 ping.record_data(bytes.len());
248 return Poll::Ready(Some(Ok(Frame::data(bytes))));
249 }
250 Some(Err(e)) => {
251 return match e.reason() {
252 Some(h2::Reason::NO_ERROR) | Some(h2::Reason::CANCEL) => {
255 Poll::Ready(None)
256 }
257 _ => Poll::Ready(Some(Err(crate::Error::new_body(e)))),
258 };
259 }
260 None => {
261 *data_done = true;
262 }
264 }
265 }
266
267 match ready!(h2.poll_trailers(cx)) {
269 Ok(t) => {
270 ping.record_non_data();
271 Poll::Ready(Ok(t.map(Frame::trailers)).transpose())
272 }
273 Err(e) => Poll::Ready(Some(Err(crate::Error::new_h2(e)))),
274 }
275 }
276
277 #[cfg(feature = "ffi")]
278 Kind::Ffi(ref mut body) => body.poll_data(cx),
279 }
280 }
281
282 fn is_end_stream(&self) -> bool {
283 match self.kind {
284 Kind::Empty => true,
285 #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
286 Kind::Chan { content_length, .. } => content_length == DecodedLength::ZERO,
287 #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
288 Kind::H2 { recv: ref h2, .. } => h2.is_end_stream(),
289 #[cfg(feature = "ffi")]
290 Kind::Ffi(..) => false,
291 }
292 }
293
294 fn size_hint(&self) -> SizeHint {
295 #[cfg(all(
296 any(feature = "http1", feature = "http2"),
297 any(feature = "client", feature = "server")
298 ))]
299 fn opt_len(decoded_length: DecodedLength) -> SizeHint {
300 if let Some(content_length) = decoded_length.into_opt() {
301 SizeHint::with_exact(content_length)
302 } else {
303 SizeHint::default()
304 }
305 }
306
307 match self.kind {
308 Kind::Empty => SizeHint::with_exact(0),
309 #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
310 Kind::Chan { content_length, .. } => opt_len(content_length),
311 #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
312 Kind::H2 { content_length, .. } => opt_len(content_length),
313 #[cfg(feature = "ffi")]
314 Kind::Ffi(..) => SizeHint::default(),
315 }
316 }
317}
318
319impl fmt::Debug for Incoming {
320 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
321 #[cfg(any(
322 all(
323 any(feature = "http1", feature = "http2"),
324 any(feature = "client", feature = "server")
325 ),
326 feature = "ffi"
327 ))]
328 #[derive(Debug)]
329 struct Streaming;
330 #[derive(Debug)]
331 struct Empty;
332
333 let mut builder = f.debug_tuple("Body");
334 match self.kind {
335 Kind::Empty => builder.field(&Empty),
336 #[cfg(any(
337 all(
338 any(feature = "http1", feature = "http2"),
339 any(feature = "client", feature = "server")
340 ),
341 feature = "ffi"
342 ))]
343 _ => builder.field(&Streaming),
344 };
345
346 builder.finish()
347 }
348}
349
350#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
351impl Sender {
352 pub(crate) fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
354 ready!(self.poll_want(cx)?);
356 self.data_tx
357 .poll_ready(cx)
358 .map_err(|_| crate::Error::new_closed())
359 }
360
361 fn poll_want(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
362 match self.want_rx.load(cx) {
363 WANT_READY => Poll::Ready(Ok(())),
364 WANT_PENDING => Poll::Pending,
365 watch::CLOSED => Poll::Ready(Err(crate::Error::new_closed())),
366 unexpected => unreachable!("want_rx value: {}", unexpected),
367 }
368 }
369
370 #[cfg(test)]
371 async fn ready(&mut self) -> crate::Result<()> {
372 futures_util::future::poll_fn(|cx| self.poll_ready(cx)).await
373 }
374
375 #[cfg(test)]
377 #[allow(unused)]
378 pub(crate) async fn send_data(&mut self, chunk: Bytes) -> crate::Result<()> {
379 self.ready().await?;
380 self.data_tx
381 .try_send(Ok(chunk))
382 .map_err(|_| crate::Error::new_closed())
383 }
384
385 #[allow(unused)]
387 pub(crate) async fn send_trailers(&mut self, trailers: HeaderMap) -> crate::Result<()> {
388 let tx = match self.trailers_tx.take() {
389 Some(tx) => tx,
390 None => return Err(crate::Error::new_closed()),
391 };
392 tx.send(trailers).map_err(|_| crate::Error::new_closed())
393 }
394
395 #[cfg(feature = "http1")]
408 pub(crate) fn try_send_data(&mut self, chunk: Bytes) -> Result<(), Bytes> {
409 self.data_tx
410 .try_send(Ok(chunk))
411 .map_err(|err| err.into_inner().expect("just sent Ok"))
412 }
413
414 #[cfg(feature = "http1")]
415 pub(crate) fn try_send_trailers(
416 &mut self,
417 trailers: HeaderMap,
418 ) -> Result<(), Option<HeaderMap>> {
419 let tx = match self.trailers_tx.take() {
420 Some(tx) => tx,
421 None => return Err(None),
422 };
423
424 tx.send(trailers).map_err(Some)
425 }
426
427 #[cfg(test)]
428 pub(crate) fn abort(mut self) {
429 self.send_error(crate::Error::new_body_write_aborted());
430 }
431
432 pub(crate) fn send_error(&mut self, err: crate::Error) {
433 let _ = self
434 .data_tx
435 .clone()
437 .try_send(Err(err));
438 }
439}
440
441#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
442impl fmt::Debug for Sender {
443 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
444 #[derive(Debug)]
445 struct Open;
446 #[derive(Debug)]
447 struct Closed;
448
449 let mut builder = f.debug_tuple("Sender");
450 match self.want_rx.peek() {
451 watch::CLOSED => builder.field(&Closed),
452 _ => builder.field(&Open),
453 };
454
455 builder.finish()
456 }
457}
458
459#[cfg(test)]
460mod tests {
461 #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
462 use std::mem;
463 #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
464 use std::task::Poll;
465
466 #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
467 use super::{Body, Incoming, SizeHint};
468 #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
469 use super::{DecodedLength, Sender};
470 #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
471 use http_body_util::BodyExt;
472
473 #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
474 #[test]
475 fn test_size_of() {
476 let body_size = mem::size_of::<Incoming>();
480 let body_expected_size = mem::size_of::<u64>() * 5;
481 assert!(
482 body_size <= body_expected_size,
483 "Body size = {} <= {}",
484 body_size,
485 body_expected_size,
486 );
487
488 assert_eq!(
491 mem::size_of::<Sender>(),
492 mem::size_of::<usize>() * 5,
493 "Sender"
494 );
495
496 assert_eq!(
497 mem::size_of::<Sender>(),
498 mem::size_of::<Option<Sender>>(),
499 "Option<Sender>"
500 );
501 }
502
503 #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
504 #[test]
505 fn size_hint() {
506 fn eq(body: Incoming, b: SizeHint, note: &str) {
507 let a = body.size_hint();
508 assert_eq!(a.lower(), b.lower(), "lower for {:?}", note);
509 assert_eq!(a.upper(), b.upper(), "upper for {:?}", note);
510 }
511
512 eq(Incoming::empty(), SizeHint::with_exact(0), "empty");
513
514 eq(Incoming::channel().1, SizeHint::new(), "channel");
515
516 eq(
517 Incoming::new_channel(DecodedLength::new(4), false).1,
518 SizeHint::with_exact(4),
519 "channel with length",
520 );
521 }
522
523 #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
524 #[cfg(not(miri))]
525 #[tokio::test]
526 async fn channel_abort() {
527 let (tx, mut rx) = Incoming::channel();
528
529 tx.abort();
530
531 let err = rx.frame().await.unwrap().unwrap_err();
532 assert!(err.is_body_write_aborted(), "{:?}", err);
533 }
534
535 #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
536 #[cfg(all(not(miri), feature = "http1"))]
537 #[tokio::test]
538 async fn channel_abort_when_buffer_is_full() {
539 let (mut tx, mut rx) = Incoming::channel();
540
541 tx.try_send_data("chunk 1".into()).expect("send 1");
542 tx.abort();
544
545 let chunk1 = rx
546 .frame()
547 .await
548 .expect("item 1")
549 .expect("chunk 1")
550 .into_data()
551 .unwrap();
552 assert_eq!(chunk1, "chunk 1");
553
554 let err = rx.frame().await.unwrap().unwrap_err();
555 assert!(err.is_body_write_aborted(), "{:?}", err);
556 }
557
558 #[cfg(feature = "http1")]
559 #[test]
560 fn channel_buffers_one() {
561 let (mut tx, _rx) = Incoming::channel();
562
563 tx.try_send_data("chunk 1".into()).expect("send 1");
564
565 let chunk2 = tx.try_send_data("chunk 2".into()).expect_err("send 2");
567 assert_eq!(chunk2, "chunk 2");
568 }
569
570 #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
571 #[cfg(not(miri))]
572 #[tokio::test]
573 async fn channel_empty() {
574 let (_, mut rx) = Incoming::channel();
575
576 assert!(rx.frame().await.is_none());
577 }
578
579 #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
580 #[test]
581 fn channel_ready() {
582 let (mut tx, _rx) = Incoming::new_channel(DecodedLength::CHUNKED, false);
583
584 let mut tx_ready = tokio_test::task::spawn(tx.ready());
585
586 assert!(tx_ready.poll().is_ready(), "tx is ready immediately");
587 }
588
589 #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
590 #[test]
591 fn channel_wanter() {
592 let (mut tx, mut rx) =
593 Incoming::new_channel(DecodedLength::CHUNKED, true);
594
595 let mut tx_ready = tokio_test::task::spawn(tx.ready());
596 let mut rx_data = tokio_test::task::spawn(rx.frame());
597
598 assert!(
599 tx_ready.poll().is_pending(),
600 "tx isn't ready before rx has been polled"
601 );
602
603 assert!(rx_data.poll().is_pending(), "poll rx.data");
604 assert!(tx_ready.is_woken(), "rx poll wakes tx");
605
606 assert!(
607 tx_ready.poll().is_ready(),
608 "tx is ready after rx has been polled"
609 );
610 }
611
612 #[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
613 #[test]
614 fn channel_notices_closure() {
615 let (mut tx, rx) = Incoming::new_channel(DecodedLength::CHUNKED, true);
616
617 let mut tx_ready = tokio_test::task::spawn(tx.ready());
618
619 assert!(
620 tx_ready.poll().is_pending(),
621 "tx isn't ready before rx has been polled"
622 );
623
624 drop(rx);
625 assert!(tx_ready.is_woken(), "dropping rx wakes tx");
626
627 match tx_ready.poll() {
628 Poll::Ready(Err(ref e)) if e.is_closed() => (),
629 unexpected => panic!("tx poll ready unexpected: {:?}", unexpected),
630 }
631 }
632}