1pub mod upgrade;
4
5use hyper::service::HttpService;
6use std::future::Future;
7use std::marker::PhantomPinned;
8use std::mem::MaybeUninit;
9use std::pin::Pin;
10use std::task::{Context, Poll};
11use std::{error::Error as StdError, io, time::Duration};
12
13use bytes::Bytes;
14use futures_core::ready;
15use http::{Request, Response};
16use http_body::Body;
17use hyper::{
18 body::Incoming,
19 rt::{Read, ReadBuf, Timer, Write},
20 service::Service,
21};
22
23#[cfg(feature = "http1")]
24use hyper::server::conn::http1;
25
26#[cfg(feature = "http2")]
27use hyper::{rt::bounds::Http2ServerConnExec, server::conn::http2};
28
29#[cfg(any(not(feature = "http2"), not(feature = "http1")))]
30use std::marker::PhantomData;
31
32use pin_project_lite::pin_project;
33
34use crate::common::rewind::Rewind;
35
36type Error = Box<dyn std::error::Error + Send + Sync>;
37
38type Result<T> = std::result::Result<T, Error>;
39
40const H2_PREFACE: &[u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
41
42#[cfg(feature = "http2")]
44pub trait HttpServerConnExec<A, B: Body>: Http2ServerConnExec<A, B> {}
45
46#[cfg(feature = "http2")]
47impl<A, B: Body, T: Http2ServerConnExec<A, B>> HttpServerConnExec<A, B> for T {}
48
49#[cfg(not(feature = "http2"))]
51pub trait HttpServerConnExec<A, B: Body> {}
52
53#[cfg(not(feature = "http2"))]
54impl<A, B: Body, T> HttpServerConnExec<A, B> for T {}
55
56#[derive(Clone, Debug)]
58pub struct Builder<E> {
59 #[cfg(feature = "http1")]
60 http1: http1::Builder,
61 #[cfg(feature = "http2")]
62 http2: http2::Builder<E>,
63 #[cfg(any(feature = "http1", feature = "http2"))]
64 version: Option<Version>,
65 #[cfg(not(feature = "http2"))]
66 _executor: E,
67}
68
69impl<E: Default> Default for Builder<E> {
70 fn default() -> Self {
71 Self::new(E::default())
72 }
73}
74
75impl<E> Builder<E> {
76 pub fn new(executor: E) -> Self {
92 Self {
93 #[cfg(feature = "http1")]
94 http1: http1::Builder::new(),
95 #[cfg(feature = "http2")]
96 http2: http2::Builder::new(executor),
97 #[cfg(any(feature = "http1", feature = "http2"))]
98 version: None,
99 #[cfg(not(feature = "http2"))]
100 _executor: executor,
101 }
102 }
103
104 #[cfg(feature = "http1")]
106 pub fn http1(&mut self) -> Http1Builder<'_, E> {
107 Http1Builder { inner: self }
108 }
109
110 #[cfg(feature = "http2")]
112 pub fn http2(&mut self) -> Http2Builder<'_, E> {
113 Http2Builder { inner: self }
114 }
115
116 #[cfg(feature = "http2")]
122 pub fn http2_only(mut self) -> Self {
123 assert!(self.version.is_none());
124 self.version = Some(Version::H2);
125 self
126 }
127
128 #[cfg(feature = "http1")]
134 pub fn http1_only(mut self) -> Self {
135 assert!(self.version.is_none());
136 self.version = Some(Version::H1);
137 self
138 }
139
140 pub fn is_http1_available(&self) -> bool {
142 match self.version {
143 #[cfg(feature = "http1")]
144 Some(Version::H1) => true,
145 #[cfg(feature = "http2")]
146 Some(Version::H2) => false,
147 #[cfg(any(feature = "http1", feature = "http2"))]
148 _ => true,
149 }
150 }
151
152 pub fn is_http2_available(&self) -> bool {
154 match self.version {
155 #[cfg(feature = "http1")]
156 Some(Version::H1) => false,
157 #[cfg(feature = "http2")]
158 Some(Version::H2) => true,
159 #[cfg(any(feature = "http1", feature = "http2"))]
160 _ => true,
161 }
162 }
163
164 #[cfg(feature = "http1")]
184 pub fn title_case_headers(mut self, enabled: bool) -> Self {
185 self.http1.title_case_headers(enabled);
186 self
187 }
188
189 #[cfg(feature = "http1")]
208 pub fn preserve_header_case(mut self, enabled: bool) -> Self {
209 self.http1.preserve_header_case(enabled);
210 self
211 }
212
213 pub fn serve_connection<I, S, B>(&self, io: I, service: S) -> Connection<'_, I, S, E>
215 where
216 S: Service<Request<Incoming>, Response = Response<B>>,
217 S::Future: 'static,
218 S::Error: Into<Box<dyn StdError + Send + Sync>>,
219 B: Body + 'static,
220 B::Error: Into<Box<dyn StdError + Send + Sync>>,
221 I: Read + Write + Unpin + 'static,
222 E: HttpServerConnExec<S::Future, B>,
223 {
224 let state = match self.version {
225 #[cfg(feature = "http1")]
226 Some(Version::H1) => {
227 let io = Rewind::new_buffered(io, Bytes::new());
228 let conn = self.http1.serve_connection(io, service);
229 ConnState::H1 { conn }
230 }
231 #[cfg(feature = "http2")]
232 Some(Version::H2) => {
233 let io = Rewind::new_buffered(io, Bytes::new());
234 let conn = self.http2.serve_connection(io, service);
235 ConnState::H2 { conn }
236 }
237 #[cfg(any(feature = "http1", feature = "http2"))]
238 _ => ConnState::ReadVersion {
239 read_version: read_version(io),
240 builder: Cow::Borrowed(self),
241 service: Some(service),
242 },
243 };
244
245 Connection { state }
246 }
247
248 pub fn serve_connection_with_upgrades<I, S, B>(
258 &self,
259 io: I,
260 service: S,
261 ) -> UpgradeableConnection<'_, I, S, E>
262 where
263 S: Service<Request<Incoming>, Response = Response<B>>,
264 S::Future: 'static,
265 S::Error: Into<Box<dyn StdError + Send + Sync>>,
266 B: Body + 'static,
267 B::Error: Into<Box<dyn StdError + Send + Sync>>,
268 I: Read + Write + Unpin + Send + 'static,
269 E: HttpServerConnExec<S::Future, B>,
270 {
271 UpgradeableConnection {
272 state: UpgradeableConnState::ReadVersion {
273 read_version: read_version(io),
274 builder: Cow::Borrowed(self),
275 service: Some(service),
276 },
277 }
278 }
279}
280
281#[derive(Copy, Clone, Debug)]
282enum Version {
283 H1,
284 H2,
285}
286
287impl Version {
288 #[must_use]
289 #[cfg(any(not(feature = "http2"), not(feature = "http1")))]
290 pub fn unsupported(self) -> Error {
291 match self {
292 Version::H1 => Error::from("HTTP/1 is not supported"),
293 Version::H2 => Error::from("HTTP/2 is not supported"),
294 }
295 }
296}
297
298fn read_version<I>(io: I) -> ReadVersion<I>
299where
300 I: Read + Unpin,
301{
302 ReadVersion {
303 io: Some(io),
304 buf: [MaybeUninit::uninit(); 24],
305 filled: 0,
306 version: Version::H2,
307 cancelled: false,
308 _pin: PhantomPinned,
309 }
310}
311
312pin_project! {
313 struct ReadVersion<I> {
314 io: Option<I>,
315 buf: [MaybeUninit<u8>; 24],
316 filled: usize,
318 version: Version,
319 cancelled: bool,
320 #[pin]
322 _pin: PhantomPinned,
323 }
324}
325
326impl<I> ReadVersion<I> {
327 pub fn cancel(self: Pin<&mut Self>) {
328 *self.project().cancelled = true;
329 }
330}
331
332impl<I> Future for ReadVersion<I>
333where
334 I: Read + Unpin,
335{
336 type Output = io::Result<(Version, Rewind<I>)>;
337
338 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
339 let this = self.project();
340 if *this.cancelled {
341 return Poll::Ready(Err(io::Error::new(io::ErrorKind::Interrupted, "Cancelled")));
342 }
343
344 let mut buf = ReadBuf::uninit(&mut *this.buf);
345 unsafe {
348 buf.unfilled().advance(*this.filled);
349 };
350
351 while buf.filled().len() < H2_PREFACE.len() {
353 let len = buf.filled().len();
354 ready!(Pin::new(this.io.as_mut().unwrap()).poll_read(cx, buf.unfilled()))?;
355 *this.filled = buf.filled().len();
356
357 if buf.filled().len() == len
359 || buf.filled()[len..] != H2_PREFACE[len..buf.filled().len()]
360 {
361 *this.version = Version::H1;
362 break;
363 }
364 }
365
366 let io = this.io.take().unwrap();
367 let buf = buf.filled().to_vec();
368 Poll::Ready(Ok((
369 *this.version,
370 Rewind::new_buffered(io, Bytes::from(buf)),
371 )))
372 }
373}
374
375pin_project! {
376 #[must_use = "futures do nothing unless polled"]
382 pub struct Connection<'a, I, S, E>
383 where
384 S: HttpService<Incoming>,
385 {
386 #[pin]
387 state: ConnState<'a, I, S, E>,
388 }
389}
390
391enum Cow<'a, T> {
393 Borrowed(&'a T),
394 Owned(T),
395}
396
397impl<T> std::ops::Deref for Cow<'_, T> {
398 type Target = T;
399 fn deref(&self) -> &T {
400 match self {
401 Cow::Borrowed(t) => &*t,
402 Cow::Owned(ref t) => t,
403 }
404 }
405}
406
407#[cfg(feature = "http1")]
408type Http1Connection<I, S> = hyper::server::conn::http1::Connection<Rewind<I>, S>;
409
410#[cfg(not(feature = "http1"))]
411type Http1Connection<I, S> = (PhantomData<I>, PhantomData<S>);
412
413#[cfg(feature = "http2")]
414type Http2Connection<I, S, E> = hyper::server::conn::http2::Connection<Rewind<I>, S, E>;
415
416#[cfg(not(feature = "http2"))]
417type Http2Connection<I, S, E> = (PhantomData<I>, PhantomData<S>, PhantomData<E>);
418
419pin_project! {
420 #[project = ConnStateProj]
421 enum ConnState<'a, I, S, E>
422 where
423 S: HttpService<Incoming>,
424 {
425 ReadVersion {
426 #[pin]
427 read_version: ReadVersion<I>,
428 builder: Cow<'a, Builder<E>>,
429 service: Option<S>,
430 },
431 H1 {
432 #[pin]
433 conn: Http1Connection<I, S>,
434 },
435 H2 {
436 #[pin]
437 conn: Http2Connection<I, S, E>,
438 },
439 }
440}
441
442impl<I, S, E, B> Connection<'_, I, S, E>
443where
444 S: HttpService<Incoming, ResBody = B>,
445 S::Error: Into<Box<dyn StdError + Send + Sync>>,
446 I: Read + Write + Unpin,
447 B: Body + 'static,
448 B::Error: Into<Box<dyn StdError + Send + Sync>>,
449 E: HttpServerConnExec<S::Future, B>,
450{
451 pub fn graceful_shutdown(self: Pin<&mut Self>) {
460 match self.project().state.project() {
461 ConnStateProj::ReadVersion { read_version, .. } => read_version.cancel(),
462 #[cfg(feature = "http1")]
463 ConnStateProj::H1 { conn } => conn.graceful_shutdown(),
464 #[cfg(feature = "http2")]
465 ConnStateProj::H2 { conn } => conn.graceful_shutdown(),
466 #[cfg(any(not(feature = "http1"), not(feature = "http2")))]
467 _ => unreachable!(),
468 }
469 }
470
471 pub fn into_owned(self) -> Connection<'static, I, S, E>
473 where
474 Builder<E>: Clone,
475 {
476 Connection {
477 state: match self.state {
478 ConnState::ReadVersion {
479 read_version,
480 builder,
481 service,
482 } => ConnState::ReadVersion {
483 read_version,
484 service,
485 builder: Cow::Owned(builder.clone()),
486 },
487 #[cfg(feature = "http1")]
488 ConnState::H1 { conn } => ConnState::H1 { conn },
489 #[cfg(feature = "http2")]
490 ConnState::H2 { conn } => ConnState::H2 { conn },
491 #[cfg(any(not(feature = "http1"), not(feature = "http2")))]
492 _ => unreachable!(),
493 },
494 }
495 }
496}
497
498impl<I, S, E, B> Future for Connection<'_, I, S, E>
499where
500 S: Service<Request<Incoming>, Response = Response<B>>,
501 S::Future: 'static,
502 S::Error: Into<Box<dyn StdError + Send + Sync>>,
503 B: Body + 'static,
504 B::Error: Into<Box<dyn StdError + Send + Sync>>,
505 I: Read + Write + Unpin + 'static,
506 E: HttpServerConnExec<S::Future, B>,
507{
508 type Output = Result<()>;
509
510 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
511 loop {
512 let mut this = self.as_mut().project();
513
514 match this.state.as_mut().project() {
515 ConnStateProj::ReadVersion {
516 read_version,
517 builder,
518 service,
519 } => {
520 let (version, io) = ready!(read_version.poll(cx))?;
521 let service = service.take().unwrap();
522 match version {
523 #[cfg(feature = "http1")]
524 Version::H1 => {
525 let conn = builder.http1.serve_connection(io, service);
526 this.state.set(ConnState::H1 { conn });
527 }
528 #[cfg(feature = "http2")]
529 Version::H2 => {
530 let conn = builder.http2.serve_connection(io, service);
531 this.state.set(ConnState::H2 { conn });
532 }
533 #[cfg(any(not(feature = "http1"), not(feature = "http2")))]
534 _ => return Poll::Ready(Err(version.unsupported())),
535 }
536 }
537 #[cfg(feature = "http1")]
538 ConnStateProj::H1 { conn } => {
539 return conn.poll(cx).map_err(Into::into);
540 }
541 #[cfg(feature = "http2")]
542 ConnStateProj::H2 { conn } => {
543 return conn.poll(cx).map_err(Into::into);
544 }
545 #[cfg(any(not(feature = "http1"), not(feature = "http2")))]
546 _ => unreachable!(),
547 }
548 }
549 }
550}
551
552pin_project! {
553 #[must_use = "futures do nothing unless polled"]
559 pub struct UpgradeableConnection<'a, I, S, E>
560 where
561 S: HttpService<Incoming>,
562 {
563 #[pin]
564 state: UpgradeableConnState<'a, I, S, E>,
565 }
566}
567
568#[cfg(feature = "http1")]
569type Http1UpgradeableConnection<I, S> = hyper::server::conn::http1::UpgradeableConnection<I, S>;
570
571#[cfg(not(feature = "http1"))]
572type Http1UpgradeableConnection<I, S> = (PhantomData<I>, PhantomData<S>);
573
574pin_project! {
575 #[project = UpgradeableConnStateProj]
576 enum UpgradeableConnState<'a, I, S, E>
577 where
578 S: HttpService<Incoming>,
579 {
580 ReadVersion {
581 #[pin]
582 read_version: ReadVersion<I>,
583 builder: Cow<'a, Builder<E>>,
584 service: Option<S>,
585 },
586 H1 {
587 #[pin]
588 conn: Http1UpgradeableConnection<Rewind<I>, S>,
589 },
590 H2 {
591 #[pin]
592 conn: Http2Connection<I, S, E>,
593 },
594 }
595}
596
597impl<I, S, E, B> UpgradeableConnection<'_, I, S, E>
598where
599 S: HttpService<Incoming, ResBody = B>,
600 S::Error: Into<Box<dyn StdError + Send + Sync>>,
601 I: Read + Write + Unpin,
602 B: Body + 'static,
603 B::Error: Into<Box<dyn StdError + Send + Sync>>,
604 E: HttpServerConnExec<S::Future, B>,
605{
606 pub fn graceful_shutdown(self: Pin<&mut Self>) {
615 match self.project().state.project() {
616 UpgradeableConnStateProj::ReadVersion { read_version, .. } => read_version.cancel(),
617 #[cfg(feature = "http1")]
618 UpgradeableConnStateProj::H1 { conn } => conn.graceful_shutdown(),
619 #[cfg(feature = "http2")]
620 UpgradeableConnStateProj::H2 { conn } => conn.graceful_shutdown(),
621 #[cfg(any(not(feature = "http1"), not(feature = "http2")))]
622 _ => unreachable!(),
623 }
624 }
625
626 pub fn into_owned(self) -> UpgradeableConnection<'static, I, S, E>
628 where
629 Builder<E>: Clone,
630 {
631 UpgradeableConnection {
632 state: match self.state {
633 UpgradeableConnState::ReadVersion {
634 read_version,
635 builder,
636 service,
637 } => UpgradeableConnState::ReadVersion {
638 read_version,
639 service,
640 builder: Cow::Owned(builder.clone()),
641 },
642 #[cfg(feature = "http1")]
643 UpgradeableConnState::H1 { conn } => UpgradeableConnState::H1 { conn },
644 #[cfg(feature = "http2")]
645 UpgradeableConnState::H2 { conn } => UpgradeableConnState::H2 { conn },
646 #[cfg(any(not(feature = "http1"), not(feature = "http2")))]
647 _ => unreachable!(),
648 },
649 }
650 }
651}
652
653impl<I, S, E, B> Future for UpgradeableConnection<'_, I, S, E>
654where
655 S: Service<Request<Incoming>, Response = Response<B>>,
656 S::Future: 'static,
657 S::Error: Into<Box<dyn StdError + Send + Sync>>,
658 B: Body + 'static,
659 B::Error: Into<Box<dyn StdError + Send + Sync>>,
660 I: Read + Write + Unpin + Send + 'static,
661 E: HttpServerConnExec<S::Future, B>,
662{
663 type Output = Result<()>;
664
665 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
666 loop {
667 let mut this = self.as_mut().project();
668
669 match this.state.as_mut().project() {
670 UpgradeableConnStateProj::ReadVersion {
671 read_version,
672 builder,
673 service,
674 } => {
675 let (version, io) = ready!(read_version.poll(cx))?;
676 let service = service.take().unwrap();
677 match version {
678 #[cfg(feature = "http1")]
679 Version::H1 => {
680 let conn = builder.http1.serve_connection(io, service).with_upgrades();
681 this.state.set(UpgradeableConnState::H1 { conn });
682 }
683 #[cfg(feature = "http2")]
684 Version::H2 => {
685 let conn = builder.http2.serve_connection(io, service);
686 this.state.set(UpgradeableConnState::H2 { conn });
687 }
688 #[cfg(any(not(feature = "http1"), not(feature = "http2")))]
689 _ => return Poll::Ready(Err(version.unsupported())),
690 }
691 }
692 #[cfg(feature = "http1")]
693 UpgradeableConnStateProj::H1 { conn } => {
694 return conn.poll(cx).map_err(Into::into);
695 }
696 #[cfg(feature = "http2")]
697 UpgradeableConnStateProj::H2 { conn } => {
698 return conn.poll(cx).map_err(Into::into);
699 }
700 #[cfg(any(not(feature = "http1"), not(feature = "http2")))]
701 _ => unreachable!(),
702 }
703 }
704 }
705}
706
707#[cfg(feature = "http1")]
709pub struct Http1Builder<'a, E> {
710 inner: &'a mut Builder<E>,
711}
712
713#[cfg(feature = "http1")]
714impl<E> Http1Builder<'_, E> {
715 #[cfg(feature = "http2")]
717 pub fn http2(&mut self) -> Http2Builder<'_, E> {
718 Http2Builder { inner: self.inner }
719 }
720
721 pub fn auto_date_header(&mut self, enabled: bool) -> &mut Self {
727 self.inner.http1.auto_date_header(enabled);
728 self
729 }
730
731 pub fn half_close(&mut self, val: bool) -> &mut Self {
740 self.inner.http1.half_close(val);
741 self
742 }
743
744 pub fn keep_alive(&mut self, val: bool) -> &mut Self {
748 self.inner.http1.keep_alive(val);
749 self
750 }
751
752 pub fn title_case_headers(&mut self, enabled: bool) -> &mut Self {
759 self.inner.http1.title_case_headers(enabled);
760 self
761 }
762
763 pub fn ignore_invalid_headers(&mut self, enabled: bool) -> &mut Self {
771 self.inner.http1.ignore_invalid_headers(enabled);
772 self
773 }
774
775 pub fn preserve_header_case(&mut self, enabled: bool) -> &mut Self {
789 self.inner.http1.preserve_header_case(enabled);
790 self
791 }
792
793 pub fn max_headers(&mut self, val: usize) -> &mut Self {
809 self.inner.http1.max_headers(val);
810 self
811 }
812
813 pub fn header_read_timeout(&mut self, read_timeout: impl Into<Option<Duration>>) -> &mut Self {
823 self.inner.http1.header_read_timeout(read_timeout);
824 self
825 }
826
827 pub fn writev(&mut self, val: bool) -> &mut Self {
840 self.inner.http1.writev(val);
841 self
842 }
843
844 pub fn max_buf_size(&mut self, max: usize) -> &mut Self {
852 self.inner.http1.max_buf_size(max);
853 self
854 }
855
856 pub fn pipeline_flush(&mut self, enabled: bool) -> &mut Self {
862 self.inner.http1.pipeline_flush(enabled);
863 self
864 }
865
866 pub fn timer<M>(&mut self, timer: M) -> &mut Self
868 where
869 M: Timer + Send + Sync + 'static,
870 {
871 self.inner.http1.timer(timer);
872 self
873 }
874
875 #[cfg(feature = "http2")]
877 pub async fn serve_connection<I, S, B>(&self, io: I, service: S) -> Result<()>
878 where
879 S: Service<Request<Incoming>, Response = Response<B>>,
880 S::Future: 'static,
881 S::Error: Into<Box<dyn StdError + Send + Sync>>,
882 B: Body + 'static,
883 B::Error: Into<Box<dyn StdError + Send + Sync>>,
884 I: Read + Write + Unpin + 'static,
885 E: HttpServerConnExec<S::Future, B>,
886 {
887 self.inner.serve_connection(io, service).await
888 }
889
890 #[cfg(not(feature = "http2"))]
892 pub async fn serve_connection<I, S, B>(&self, io: I, service: S) -> Result<()>
893 where
894 S: Service<Request<Incoming>, Response = Response<B>>,
895 S::Future: 'static,
896 S::Error: Into<Box<dyn StdError + Send + Sync>>,
897 B: Body + 'static,
898 B::Error: Into<Box<dyn StdError + Send + Sync>>,
899 I: Read + Write + Unpin + 'static,
900 {
901 self.inner.serve_connection(io, service).await
902 }
903
904 #[cfg(feature = "http2")]
908 pub fn serve_connection_with_upgrades<I, S, B>(
909 &self,
910 io: I,
911 service: S,
912 ) -> UpgradeableConnection<'_, I, S, E>
913 where
914 S: Service<Request<Incoming>, Response = Response<B>>,
915 S::Future: 'static,
916 S::Error: Into<Box<dyn StdError + Send + Sync>>,
917 B: Body + 'static,
918 B::Error: Into<Box<dyn StdError + Send + Sync>>,
919 I: Read + Write + Unpin + Send + 'static,
920 E: HttpServerConnExec<S::Future, B>,
921 {
922 self.inner.serve_connection_with_upgrades(io, service)
923 }
924}
925
926#[cfg(feature = "http2")]
928pub struct Http2Builder<'a, E> {
929 inner: &'a mut Builder<E>,
930}
931
932#[cfg(feature = "http2")]
933impl<E> Http2Builder<'_, E> {
934 #[cfg(feature = "http1")]
935 pub fn http1(&mut self) -> Http1Builder<'_, E> {
937 Http1Builder { inner: self.inner }
938 }
939
940 pub fn max_pending_accept_reset_streams(&mut self, max: impl Into<Option<usize>>) -> &mut Self {
947 self.inner.http2.max_pending_accept_reset_streams(max);
948 self
949 }
950
951 pub fn max_local_error_reset_streams(&mut self, max: impl Into<Option<usize>>) -> &mut Self {
960 self.inner.http2.max_local_error_reset_streams(max);
961 self
962 }
963
964 pub fn initial_stream_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
973 self.inner.http2.initial_stream_window_size(sz);
974 self
975 }
976
977 pub fn initial_connection_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
983 self.inner.http2.initial_connection_window_size(sz);
984 self
985 }
986
987 pub fn adaptive_window(&mut self, enabled: bool) -> &mut Self {
993 self.inner.http2.adaptive_window(enabled);
994 self
995 }
996
997 pub fn max_frame_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
1003 self.inner.http2.max_frame_size(sz);
1004 self
1005 }
1006
1007 pub fn max_concurrent_streams(&mut self, max: impl Into<Option<u32>>) -> &mut Self {
1014 self.inner.http2.max_concurrent_streams(max);
1015 self
1016 }
1017
1018 pub fn keep_alive_interval(&mut self, interval: impl Into<Option<Duration>>) -> &mut Self {
1028 self.inner.http2.keep_alive_interval(interval);
1029 self
1030 }
1031
1032 pub fn keep_alive_timeout(&mut self, timeout: Duration) -> &mut Self {
1042 self.inner.http2.keep_alive_timeout(timeout);
1043 self
1044 }
1045
1046 pub fn max_send_buf_size(&mut self, max: usize) -> &mut Self {
1054 self.inner.http2.max_send_buf_size(max);
1055 self
1056 }
1057
1058 pub fn enable_connect_protocol(&mut self) -> &mut Self {
1062 self.inner.http2.enable_connect_protocol();
1063 self
1064 }
1065
1066 pub fn max_header_list_size(&mut self, max: u32) -> &mut Self {
1070 self.inner.http2.max_header_list_size(max);
1071 self
1072 }
1073
1074 pub fn timer<M>(&mut self, timer: M) -> &mut Self
1076 where
1077 M: Timer + Send + Sync + 'static,
1078 {
1079 self.inner.http2.timer(timer);
1080 self
1081 }
1082
1083 pub fn auto_date_header(&mut self, enabled: bool) -> &mut Self {
1089 self.inner.http2.auto_date_header(enabled);
1090 self
1091 }
1092
1093 pub async fn serve_connection<I, S, B>(&self, io: I, service: S) -> Result<()>
1095 where
1096 S: Service<Request<Incoming>, Response = Response<B>>,
1097 S::Future: 'static,
1098 S::Error: Into<Box<dyn StdError + Send + Sync>>,
1099 B: Body + 'static,
1100 B::Error: Into<Box<dyn StdError + Send + Sync>>,
1101 I: Read + Write + Unpin + 'static,
1102 E: HttpServerConnExec<S::Future, B>,
1103 {
1104 self.inner.serve_connection(io, service).await
1105 }
1106
1107 pub fn serve_connection_with_upgrades<I, S, B>(
1111 &self,
1112 io: I,
1113 service: S,
1114 ) -> UpgradeableConnection<'_, I, S, E>
1115 where
1116 S: Service<Request<Incoming>, Response = Response<B>>,
1117 S::Future: 'static,
1118 S::Error: Into<Box<dyn StdError + Send + Sync>>,
1119 B: Body + 'static,
1120 B::Error: Into<Box<dyn StdError + Send + Sync>>,
1121 I: Read + Write + Unpin + Send + 'static,
1122 E: HttpServerConnExec<S::Future, B>,
1123 {
1124 self.inner.serve_connection_with_upgrades(io, service)
1125 }
1126}
1127
1128#[cfg(test)]
1129mod tests {
1130 use crate::{
1131 rt::{TokioExecutor, TokioIo},
1132 server::conn::auto,
1133 };
1134 use http::{Request, Response};
1135 use http_body::Body;
1136 use http_body_util::{BodyExt, Empty, Full};
1137 use hyper::{body, body::Bytes, client, service::service_fn};
1138 use std::{convert::Infallible, error::Error as StdError, net::SocketAddr, time::Duration};
1139 use tokio::{
1140 net::{TcpListener, TcpStream},
1141 pin,
1142 };
1143
1144 const BODY: &[u8] = b"Hello, world!";
1145
1146 #[test]
1147 fn configuration() {
1148 auto::Builder::new(TokioExecutor::new())
1150 .http1()
1151 .keep_alive(true)
1152 .http2()
1153 .keep_alive_interval(None);
1154 let mut builder = auto::Builder::new(TokioExecutor::new());
1158
1159 builder.http1().keep_alive(true);
1160 builder.http2().keep_alive_interval(None);
1161 }
1163
1164 #[test]
1165 #[cfg(feature = "http1")]
1166 fn title_case_headers_configuration() {
1167 auto::Builder::new(TokioExecutor::new()).title_case_headers(true);
1169
1170 auto::Builder::new(TokioExecutor::new())
1172 .title_case_headers(true)
1173 .http1_only();
1174 }
1175
1176 #[test]
1177 #[cfg(feature = "http1")]
1178 fn preserve_header_case_configuration() {
1179 auto::Builder::new(TokioExecutor::new()).preserve_header_case(true);
1181
1182 auto::Builder::new(TokioExecutor::new())
1184 .preserve_header_case(true)
1185 .http1_only();
1186 }
1187
1188 #[cfg(not(miri))]
1189 #[tokio::test]
1190 async fn http1() {
1191 let addr = start_server(false, false).await;
1192 let mut sender = connect_h1(addr).await;
1193
1194 let response = sender
1195 .send_request(Request::new(Empty::<Bytes>::new()))
1196 .await
1197 .unwrap();
1198
1199 let body = response.into_body().collect().await.unwrap().to_bytes();
1200
1201 assert_eq!(body, BODY);
1202 }
1203
1204 #[cfg(not(miri))]
1205 #[tokio::test]
1206 async fn http2() {
1207 let addr = start_server(false, false).await;
1208 let mut sender = connect_h2(addr).await;
1209
1210 let response = sender
1211 .send_request(Request::new(Empty::<Bytes>::new()))
1212 .await
1213 .unwrap();
1214
1215 let body = response.into_body().collect().await.unwrap().to_bytes();
1216
1217 assert_eq!(body, BODY);
1218 }
1219
1220 #[cfg(not(miri))]
1221 #[tokio::test]
1222 async fn http2_only() {
1223 let addr = start_server(false, true).await;
1224 let mut sender = connect_h2(addr).await;
1225
1226 let response = sender
1227 .send_request(Request::new(Empty::<Bytes>::new()))
1228 .await
1229 .unwrap();
1230
1231 let body = response.into_body().collect().await.unwrap().to_bytes();
1232
1233 assert_eq!(body, BODY);
1234 }
1235
1236 #[cfg(not(miri))]
1237 #[tokio::test]
1238 async fn http2_only_fail_if_client_is_http1() {
1239 let addr = start_server(false, true).await;
1240 let mut sender = connect_h1(addr).await;
1241
1242 let _ = sender
1243 .send_request(Request::new(Empty::<Bytes>::new()))
1244 .await
1245 .expect_err("should fail");
1246 }
1247
1248 #[cfg(not(miri))]
1249 #[tokio::test]
1250 async fn http1_only() {
1251 let addr = start_server(true, false).await;
1252 let mut sender = connect_h1(addr).await;
1253
1254 let response = sender
1255 .send_request(Request::new(Empty::<Bytes>::new()))
1256 .await
1257 .unwrap();
1258
1259 let body = response.into_body().collect().await.unwrap().to_bytes();
1260
1261 assert_eq!(body, BODY);
1262 }
1263
1264 #[cfg(not(miri))]
1265 #[tokio::test]
1266 async fn http1_only_fail_if_client_is_http2() {
1267 let addr = start_server(true, false).await;
1268 let mut sender = connect_h2(addr).await;
1269
1270 let _ = sender
1271 .send_request(Request::new(Empty::<Bytes>::new()))
1272 .await
1273 .expect_err("should fail");
1274 }
1275
1276 #[cfg(not(miri))]
1277 #[tokio::test]
1278 async fn graceful_shutdown() {
1279 let listener = TcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 0)))
1280 .await
1281 .unwrap();
1282
1283 let listener_addr = listener.local_addr().unwrap();
1284
1285 let listen_task = tokio::spawn(async move { listener.accept().await.unwrap() });
1287 let _stream = TcpStream::connect(listener_addr).await.unwrap();
1289
1290 let (stream, _) = listen_task.await.unwrap();
1291 let stream = TokioIo::new(stream);
1292 let builder = auto::Builder::new(TokioExecutor::new());
1293 let connection = builder.serve_connection(stream, service_fn(hello));
1294
1295 pin!(connection);
1296
1297 connection.as_mut().graceful_shutdown();
1298
1299 let connection_error = tokio::time::timeout(Duration::from_millis(200), connection)
1300 .await
1301 .expect("Connection should have finished in a timely manner after graceful shutdown.")
1302 .expect_err("Connection should have been interrupted.");
1303
1304 let connection_error = connection_error
1305 .downcast_ref::<std::io::Error>()
1306 .expect("The error should have been `std::io::Error`.");
1307 assert_eq!(connection_error.kind(), std::io::ErrorKind::Interrupted);
1308 }
1309
1310 async fn connect_h1<B>(addr: SocketAddr) -> client::conn::http1::SendRequest<B>
1311 where
1312 B: Body + Send + 'static,
1313 B::Data: Send,
1314 B::Error: Into<Box<dyn StdError + Send + Sync>>,
1315 {
1316 let stream = TokioIo::new(TcpStream::connect(addr).await.unwrap());
1317 let (sender, connection) = client::conn::http1::handshake(stream).await.unwrap();
1318
1319 tokio::spawn(connection);
1320
1321 sender
1322 }
1323
1324 async fn connect_h2<B>(addr: SocketAddr) -> client::conn::http2::SendRequest<B>
1325 where
1326 B: Body + Unpin + Send + 'static,
1327 B::Data: Send,
1328 B::Error: Into<Box<dyn StdError + Send + Sync>>,
1329 {
1330 let stream = TokioIo::new(TcpStream::connect(addr).await.unwrap());
1331 let (sender, connection) = client::conn::http2::Builder::new(TokioExecutor::new())
1332 .handshake(stream)
1333 .await
1334 .unwrap();
1335
1336 tokio::spawn(connection);
1337
1338 sender
1339 }
1340
1341 async fn start_server(h1_only: bool, h2_only: bool) -> SocketAddr {
1342 let addr: SocketAddr = ([127, 0, 0, 1], 0).into();
1343 let listener = TcpListener::bind(addr).await.unwrap();
1344
1345 let local_addr = listener.local_addr().unwrap();
1346
1347 tokio::spawn(async move {
1348 loop {
1349 let (stream, _) = listener.accept().await.unwrap();
1350 let stream = TokioIo::new(stream);
1351 tokio::task::spawn(async move {
1352 let mut builder = auto::Builder::new(TokioExecutor::new());
1353 if h1_only {
1354 builder = builder.http1_only();
1355 builder.serve_connection(stream, service_fn(hello)).await
1356 } else if h2_only {
1357 builder = builder.http2_only();
1358 builder.serve_connection(stream, service_fn(hello)).await
1359 } else {
1360 builder
1361 .http2()
1362 .max_header_list_size(4096)
1363 .serve_connection_with_upgrades(stream, service_fn(hello))
1364 .await
1365 }
1366 .unwrap();
1367 });
1368 }
1369 });
1370
1371 local_addr
1372 }
1373
1374 async fn hello(_req: Request<body::Incoming>) -> Result<Response<Full<Bytes>>, Infallible> {
1375 Ok(Response::new(Full::new(Bytes::from(BODY))))
1376 }
1377}