hyper_util/server/conn/auto/
mod.rs

1//! Http1 or Http2 connection.
2
3pub mod upgrade;
4
5use hyper::service::HttpService;
6use std::future::Future;
7use std::marker::PhantomPinned;
8use std::mem::MaybeUninit;
9use std::pin::Pin;
10use std::task::{Context, Poll};
11use std::{error::Error as StdError, io, time::Duration};
12
13use bytes::Bytes;
14use futures_core::ready;
15use http::{Request, Response};
16use http_body::Body;
17use hyper::{
18    body::Incoming,
19    rt::{Read, ReadBuf, Timer, Write},
20    service::Service,
21};
22
23#[cfg(feature = "http1")]
24use hyper::server::conn::http1;
25
26#[cfg(feature = "http2")]
27use hyper::{rt::bounds::Http2ServerConnExec, server::conn::http2};
28
29#[cfg(any(not(feature = "http2"), not(feature = "http1")))]
30use std::marker::PhantomData;
31
32use pin_project_lite::pin_project;
33
34use crate::common::rewind::Rewind;
35
36type Error = Box<dyn std::error::Error + Send + Sync>;
37
38type Result<T> = std::result::Result<T, Error>;
39
40const H2_PREFACE: &[u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
41
42/// Exactly equivalent to [`Http2ServerConnExec`].
43#[cfg(feature = "http2")]
44pub trait HttpServerConnExec<A, B: Body>: Http2ServerConnExec<A, B> {}
45
46#[cfg(feature = "http2")]
47impl<A, B: Body, T: Http2ServerConnExec<A, B>> HttpServerConnExec<A, B> for T {}
48
49/// Exactly equivalent to [`Http2ServerConnExec`].
50#[cfg(not(feature = "http2"))]
51pub trait HttpServerConnExec<A, B: Body> {}
52
53#[cfg(not(feature = "http2"))]
54impl<A, B: Body, T> HttpServerConnExec<A, B> for T {}
55
56/// Http1 or Http2 connection builder.
57#[derive(Clone, Debug)]
58pub struct Builder<E> {
59    #[cfg(feature = "http1")]
60    http1: http1::Builder,
61    #[cfg(feature = "http2")]
62    http2: http2::Builder<E>,
63    #[cfg(any(feature = "http1", feature = "http2"))]
64    version: Option<Version>,
65    #[cfg(not(feature = "http2"))]
66    _executor: E,
67}
68
69impl<E: Default> Default for Builder<E> {
70    fn default() -> Self {
71        Self::new(E::default())
72    }
73}
74
75impl<E> Builder<E> {
76    /// Create a new auto connection builder.
77    ///
78    /// `executor` parameter should be a type that implements
79    /// [`Executor`](hyper::rt::Executor) trait.
80    ///
81    /// # Example
82    ///
83    /// ```
84    /// use hyper_util::{
85    ///     rt::TokioExecutor,
86    ///     server::conn::auto,
87    /// };
88    ///
89    /// auto::Builder::new(TokioExecutor::new());
90    /// ```
91    pub fn new(executor: E) -> Self {
92        Self {
93            #[cfg(feature = "http1")]
94            http1: http1::Builder::new(),
95            #[cfg(feature = "http2")]
96            http2: http2::Builder::new(executor),
97            #[cfg(any(feature = "http1", feature = "http2"))]
98            version: None,
99            #[cfg(not(feature = "http2"))]
100            _executor: executor,
101        }
102    }
103
104    /// Http1 configuration.
105    #[cfg(feature = "http1")]
106    pub fn http1(&mut self) -> Http1Builder<'_, E> {
107        Http1Builder { inner: self }
108    }
109
110    /// Http2 configuration.
111    #[cfg(feature = "http2")]
112    pub fn http2(&mut self) -> Http2Builder<'_, E> {
113        Http2Builder { inner: self }
114    }
115
116    /// Only accepts HTTP/2
117    ///
118    /// Does not do anything if used with [`serve_connection_with_upgrades`]
119    ///
120    /// [`serve_connection_with_upgrades`]: Builder::serve_connection_with_upgrades
121    #[cfg(feature = "http2")]
122    pub fn http2_only(mut self) -> Self {
123        assert!(self.version.is_none());
124        self.version = Some(Version::H2);
125        self
126    }
127
128    /// Only accepts HTTP/1
129    ///
130    /// Does not do anything if used with [`serve_connection_with_upgrades`]
131    ///
132    /// [`serve_connection_with_upgrades`]: Builder::serve_connection_with_upgrades
133    #[cfg(feature = "http1")]
134    pub fn http1_only(mut self) -> Self {
135        assert!(self.version.is_none());
136        self.version = Some(Version::H1);
137        self
138    }
139
140    /// Returns `true` if this builder can serve an HTTP/1.1-based connection.
141    pub fn is_http1_available(&self) -> bool {
142        match self.version {
143            #[cfg(feature = "http1")]
144            Some(Version::H1) => true,
145            #[cfg(feature = "http2")]
146            Some(Version::H2) => false,
147            #[cfg(any(feature = "http1", feature = "http2"))]
148            _ => true,
149        }
150    }
151
152    /// Returns `true` if this builder can serve an HTTP/2-based connection.
153    pub fn is_http2_available(&self) -> bool {
154        match self.version {
155            #[cfg(feature = "http1")]
156            Some(Version::H1) => false,
157            #[cfg(feature = "http2")]
158            Some(Version::H2) => true,
159            #[cfg(any(feature = "http1", feature = "http2"))]
160            _ => true,
161        }
162    }
163
164    /// Set whether HTTP/1 connections will write header names as title case at
165    /// the socket level.
166    ///
167    /// This setting only affects HTTP/1 connections. HTTP/2 connections are
168    /// not affected by this setting.
169    ///
170    /// Default is false.
171    ///
172    /// # Example
173    ///
174    /// ```
175    /// use hyper_util::{
176    ///     rt::TokioExecutor,
177    ///     server::conn::auto,
178    /// };
179    ///
180    /// auto::Builder::new(TokioExecutor::new())
181    ///     .title_case_headers(true);
182    /// ```
183    #[cfg(feature = "http1")]
184    pub fn title_case_headers(mut self, enabled: bool) -> Self {
185        self.http1.title_case_headers(enabled);
186        self
187    }
188
189    /// Set whether HTTP/1 connections will preserve the original case of header names.
190    ///
191    /// This setting only affects HTTP/1 connections. HTTP/2 connections are
192    /// not affected by this setting.
193    ///
194    /// Default is false.
195    ///
196    /// # Example
197    ///
198    /// ```
199    /// use hyper_util::{
200    ///     rt::TokioExecutor,
201    ///     server::conn::auto,
202    /// };
203    ///
204    /// auto::Builder::new(TokioExecutor::new())
205    ///     .preserve_header_case(true);
206    /// ```
207    #[cfg(feature = "http1")]
208    pub fn preserve_header_case(mut self, enabled: bool) -> Self {
209        self.http1.preserve_header_case(enabled);
210        self
211    }
212
213    /// Bind a connection together with a [`Service`].
214    pub fn serve_connection<I, S, B>(&self, io: I, service: S) -> Connection<'_, I, S, E>
215    where
216        S: Service<Request<Incoming>, Response = Response<B>>,
217        S::Future: 'static,
218        S::Error: Into<Box<dyn StdError + Send + Sync>>,
219        B: Body + 'static,
220        B::Error: Into<Box<dyn StdError + Send + Sync>>,
221        I: Read + Write + Unpin + 'static,
222        E: HttpServerConnExec<S::Future, B>,
223    {
224        let state = match self.version {
225            #[cfg(feature = "http1")]
226            Some(Version::H1) => {
227                let io = Rewind::new_buffered(io, Bytes::new());
228                let conn = self.http1.serve_connection(io, service);
229                ConnState::H1 { conn }
230            }
231            #[cfg(feature = "http2")]
232            Some(Version::H2) => {
233                let io = Rewind::new_buffered(io, Bytes::new());
234                let conn = self.http2.serve_connection(io, service);
235                ConnState::H2 { conn }
236            }
237            #[cfg(any(feature = "http1", feature = "http2"))]
238            _ => ConnState::ReadVersion {
239                read_version: read_version(io),
240                builder: Cow::Borrowed(self),
241                service: Some(service),
242            },
243        };
244
245        Connection { state }
246    }
247
248    /// Bind a connection together with a [`Service`], with the ability to
249    /// handle HTTP upgrades. This requires that the IO object implements
250    /// `Send`.
251    ///
252    /// Note that if you ever want to use [`hyper::upgrade::Upgraded::downcast`]
253    /// with this crate, you'll need to use [`hyper_util::server::conn::auto::upgrade::downcast`]
254    /// instead. See the documentation of the latter to understand why.
255    ///
256    /// [`hyper_util::server::conn::auto::upgrade::downcast`]: crate::server::conn::auto::upgrade::downcast
257    pub fn serve_connection_with_upgrades<I, S, B>(
258        &self,
259        io: I,
260        service: S,
261    ) -> UpgradeableConnection<'_, I, S, E>
262    where
263        S: Service<Request<Incoming>, Response = Response<B>>,
264        S::Future: 'static,
265        S::Error: Into<Box<dyn StdError + Send + Sync>>,
266        B: Body + 'static,
267        B::Error: Into<Box<dyn StdError + Send + Sync>>,
268        I: Read + Write + Unpin + Send + 'static,
269        E: HttpServerConnExec<S::Future, B>,
270    {
271        UpgradeableConnection {
272            state: UpgradeableConnState::ReadVersion {
273                read_version: read_version(io),
274                builder: Cow::Borrowed(self),
275                service: Some(service),
276            },
277        }
278    }
279}
280
281#[derive(Copy, Clone, Debug)]
282enum Version {
283    H1,
284    H2,
285}
286
287impl Version {
288    #[must_use]
289    #[cfg(any(not(feature = "http2"), not(feature = "http1")))]
290    pub fn unsupported(self) -> Error {
291        match self {
292            Version::H1 => Error::from("HTTP/1 is not supported"),
293            Version::H2 => Error::from("HTTP/2 is not supported"),
294        }
295    }
296}
297
298fn read_version<I>(io: I) -> ReadVersion<I>
299where
300    I: Read + Unpin,
301{
302    ReadVersion {
303        io: Some(io),
304        buf: [MaybeUninit::uninit(); 24],
305        filled: 0,
306        version: Version::H2,
307        cancelled: false,
308        _pin: PhantomPinned,
309    }
310}
311
312pin_project! {
313    struct ReadVersion<I> {
314        io: Option<I>,
315        buf: [MaybeUninit<u8>; 24],
316        // the amount of `buf` thats been filled
317        filled: usize,
318        version: Version,
319        cancelled: bool,
320        // Make this future `!Unpin` for compatibility with async trait methods.
321        #[pin]
322        _pin: PhantomPinned,
323    }
324}
325
326impl<I> ReadVersion<I> {
327    pub fn cancel(self: Pin<&mut Self>) {
328        *self.project().cancelled = true;
329    }
330}
331
332impl<I> Future for ReadVersion<I>
333where
334    I: Read + Unpin,
335{
336    type Output = io::Result<(Version, Rewind<I>)>;
337
338    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
339        let this = self.project();
340        if *this.cancelled {
341            return Poll::Ready(Err(io::Error::new(io::ErrorKind::Interrupted, "Cancelled")));
342        }
343
344        let mut buf = ReadBuf::uninit(&mut *this.buf);
345        // SAFETY: `this.filled` tracks how many bytes have been read (and thus initialized) and
346        // we're only advancing by that many.
347        unsafe {
348            buf.unfilled().advance(*this.filled);
349        };
350
351        // We start as H2 and switch to H1 as soon as we don't have the preface.
352        while buf.filled().len() < H2_PREFACE.len() {
353            let len = buf.filled().len();
354            ready!(Pin::new(this.io.as_mut().unwrap()).poll_read(cx, buf.unfilled()))?;
355            *this.filled = buf.filled().len();
356
357            // We starts as H2 and switch to H1 when we don't get the preface.
358            if buf.filled().len() == len
359                || buf.filled()[len..] != H2_PREFACE[len..buf.filled().len()]
360            {
361                *this.version = Version::H1;
362                break;
363            }
364        }
365
366        let io = this.io.take().unwrap();
367        let buf = buf.filled().to_vec();
368        Poll::Ready(Ok((
369            *this.version,
370            Rewind::new_buffered(io, Bytes::from(buf)),
371        )))
372    }
373}
374
375pin_project! {
376    /// A [`Future`](core::future::Future) representing an HTTP/1 connection, returned from
377    /// [`Builder::serve_connection`](struct.Builder.html#method.serve_connection).
378    ///
379    /// To drive HTTP on this connection this future **must be polled**, typically with
380    /// `.await`. If it isn't polled, no progress will be made on this connection.
381    #[must_use = "futures do nothing unless polled"]
382    pub struct Connection<'a, I, S, E>
383    where
384        S: HttpService<Incoming>,
385    {
386        #[pin]
387        state: ConnState<'a, I, S, E>,
388    }
389}
390
391// A custom COW, since the libstd is has ToOwned bounds that are too eager.
392enum Cow<'a, T> {
393    Borrowed(&'a T),
394    Owned(T),
395}
396
397impl<T> std::ops::Deref for Cow<'_, T> {
398    type Target = T;
399    fn deref(&self) -> &T {
400        match self {
401            Cow::Borrowed(t) => &*t,
402            Cow::Owned(ref t) => t,
403        }
404    }
405}
406
407#[cfg(feature = "http1")]
408type Http1Connection<I, S> = hyper::server::conn::http1::Connection<Rewind<I>, S>;
409
410#[cfg(not(feature = "http1"))]
411type Http1Connection<I, S> = (PhantomData<I>, PhantomData<S>);
412
413#[cfg(feature = "http2")]
414type Http2Connection<I, S, E> = hyper::server::conn::http2::Connection<Rewind<I>, S, E>;
415
416#[cfg(not(feature = "http2"))]
417type Http2Connection<I, S, E> = (PhantomData<I>, PhantomData<S>, PhantomData<E>);
418
419pin_project! {
420    #[project = ConnStateProj]
421    enum ConnState<'a, I, S, E>
422    where
423        S: HttpService<Incoming>,
424    {
425        ReadVersion {
426            #[pin]
427            read_version: ReadVersion<I>,
428            builder: Cow<'a, Builder<E>>,
429            service: Option<S>,
430        },
431        H1 {
432            #[pin]
433            conn: Http1Connection<I, S>,
434        },
435        H2 {
436            #[pin]
437            conn: Http2Connection<I, S, E>,
438        },
439    }
440}
441
442impl<I, S, E, B> Connection<'_, I, S, E>
443where
444    S: HttpService<Incoming, ResBody = B>,
445    S::Error: Into<Box<dyn StdError + Send + Sync>>,
446    I: Read + Write + Unpin,
447    B: Body + 'static,
448    B::Error: Into<Box<dyn StdError + Send + Sync>>,
449    E: HttpServerConnExec<S::Future, B>,
450{
451    /// Start a graceful shutdown process for this connection.
452    ///
453    /// This `Connection` should continue to be polled until shutdown can finish.
454    ///
455    /// # Note
456    ///
457    /// This should only be called while the `Connection` future is still pending. If called after
458    /// `Connection::poll` has resolved, this does nothing.
459    pub fn graceful_shutdown(self: Pin<&mut Self>) {
460        match self.project().state.project() {
461            ConnStateProj::ReadVersion { read_version, .. } => read_version.cancel(),
462            #[cfg(feature = "http1")]
463            ConnStateProj::H1 { conn } => conn.graceful_shutdown(),
464            #[cfg(feature = "http2")]
465            ConnStateProj::H2 { conn } => conn.graceful_shutdown(),
466            #[cfg(any(not(feature = "http1"), not(feature = "http2")))]
467            _ => unreachable!(),
468        }
469    }
470
471    /// Make this Connection static, instead of borrowing from Builder.
472    pub fn into_owned(self) -> Connection<'static, I, S, E>
473    where
474        Builder<E>: Clone,
475    {
476        Connection {
477            state: match self.state {
478                ConnState::ReadVersion {
479                    read_version,
480                    builder,
481                    service,
482                } => ConnState::ReadVersion {
483                    read_version,
484                    service,
485                    builder: Cow::Owned(builder.clone()),
486                },
487                #[cfg(feature = "http1")]
488                ConnState::H1 { conn } => ConnState::H1 { conn },
489                #[cfg(feature = "http2")]
490                ConnState::H2 { conn } => ConnState::H2 { conn },
491                #[cfg(any(not(feature = "http1"), not(feature = "http2")))]
492                _ => unreachable!(),
493            },
494        }
495    }
496}
497
498impl<I, S, E, B> Future for Connection<'_, I, S, E>
499where
500    S: Service<Request<Incoming>, Response = Response<B>>,
501    S::Future: 'static,
502    S::Error: Into<Box<dyn StdError + Send + Sync>>,
503    B: Body + 'static,
504    B::Error: Into<Box<dyn StdError + Send + Sync>>,
505    I: Read + Write + Unpin + 'static,
506    E: HttpServerConnExec<S::Future, B>,
507{
508    type Output = Result<()>;
509
510    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
511        loop {
512            let mut this = self.as_mut().project();
513
514            match this.state.as_mut().project() {
515                ConnStateProj::ReadVersion {
516                    read_version,
517                    builder,
518                    service,
519                } => {
520                    let (version, io) = ready!(read_version.poll(cx))?;
521                    let service = service.take().unwrap();
522                    match version {
523                        #[cfg(feature = "http1")]
524                        Version::H1 => {
525                            let conn = builder.http1.serve_connection(io, service);
526                            this.state.set(ConnState::H1 { conn });
527                        }
528                        #[cfg(feature = "http2")]
529                        Version::H2 => {
530                            let conn = builder.http2.serve_connection(io, service);
531                            this.state.set(ConnState::H2 { conn });
532                        }
533                        #[cfg(any(not(feature = "http1"), not(feature = "http2")))]
534                        _ => return Poll::Ready(Err(version.unsupported())),
535                    }
536                }
537                #[cfg(feature = "http1")]
538                ConnStateProj::H1 { conn } => {
539                    return conn.poll(cx).map_err(Into::into);
540                }
541                #[cfg(feature = "http2")]
542                ConnStateProj::H2 { conn } => {
543                    return conn.poll(cx).map_err(Into::into);
544                }
545                #[cfg(any(not(feature = "http1"), not(feature = "http2")))]
546                _ => unreachable!(),
547            }
548        }
549    }
550}
551
552pin_project! {
553    /// An upgradable [`Connection`], returned by
554    /// [`Builder::serve_upgradable_connection`](struct.Builder.html#method.serve_connection_with_upgrades).
555    ///
556    /// To drive HTTP on this connection this future **must be polled**, typically with
557    /// `.await`. If it isn't polled, no progress will be made on this connection.
558    #[must_use = "futures do nothing unless polled"]
559    pub struct UpgradeableConnection<'a, I, S, E>
560    where
561        S: HttpService<Incoming>,
562    {
563        #[pin]
564        state: UpgradeableConnState<'a, I, S, E>,
565    }
566}
567
568#[cfg(feature = "http1")]
569type Http1UpgradeableConnection<I, S> = hyper::server::conn::http1::UpgradeableConnection<I, S>;
570
571#[cfg(not(feature = "http1"))]
572type Http1UpgradeableConnection<I, S> = (PhantomData<I>, PhantomData<S>);
573
574pin_project! {
575    #[project = UpgradeableConnStateProj]
576    enum UpgradeableConnState<'a, I, S, E>
577    where
578        S: HttpService<Incoming>,
579    {
580        ReadVersion {
581            #[pin]
582            read_version: ReadVersion<I>,
583            builder: Cow<'a, Builder<E>>,
584            service: Option<S>,
585        },
586        H1 {
587            #[pin]
588            conn: Http1UpgradeableConnection<Rewind<I>, S>,
589        },
590        H2 {
591            #[pin]
592            conn: Http2Connection<I, S, E>,
593        },
594    }
595}
596
597impl<I, S, E, B> UpgradeableConnection<'_, I, S, E>
598where
599    S: HttpService<Incoming, ResBody = B>,
600    S::Error: Into<Box<dyn StdError + Send + Sync>>,
601    I: Read + Write + Unpin,
602    B: Body + 'static,
603    B::Error: Into<Box<dyn StdError + Send + Sync>>,
604    E: HttpServerConnExec<S::Future, B>,
605{
606    /// Start a graceful shutdown process for this connection.
607    ///
608    /// This `UpgradeableConnection` should continue to be polled until shutdown can finish.
609    ///
610    /// # Note
611    ///
612    /// This should only be called while the `Connection` future is still nothing. pending. If
613    /// called after `UpgradeableConnection::poll` has resolved, this does nothing.
614    pub fn graceful_shutdown(self: Pin<&mut Self>) {
615        match self.project().state.project() {
616            UpgradeableConnStateProj::ReadVersion { read_version, .. } => read_version.cancel(),
617            #[cfg(feature = "http1")]
618            UpgradeableConnStateProj::H1 { conn } => conn.graceful_shutdown(),
619            #[cfg(feature = "http2")]
620            UpgradeableConnStateProj::H2 { conn } => conn.graceful_shutdown(),
621            #[cfg(any(not(feature = "http1"), not(feature = "http2")))]
622            _ => unreachable!(),
623        }
624    }
625
626    /// Make this Connection static, instead of borrowing from Builder.
627    pub fn into_owned(self) -> UpgradeableConnection<'static, I, S, E>
628    where
629        Builder<E>: Clone,
630    {
631        UpgradeableConnection {
632            state: match self.state {
633                UpgradeableConnState::ReadVersion {
634                    read_version,
635                    builder,
636                    service,
637                } => UpgradeableConnState::ReadVersion {
638                    read_version,
639                    service,
640                    builder: Cow::Owned(builder.clone()),
641                },
642                #[cfg(feature = "http1")]
643                UpgradeableConnState::H1 { conn } => UpgradeableConnState::H1 { conn },
644                #[cfg(feature = "http2")]
645                UpgradeableConnState::H2 { conn } => UpgradeableConnState::H2 { conn },
646                #[cfg(any(not(feature = "http1"), not(feature = "http2")))]
647                _ => unreachable!(),
648            },
649        }
650    }
651}
652
653impl<I, S, E, B> Future for UpgradeableConnection<'_, I, S, E>
654where
655    S: Service<Request<Incoming>, Response = Response<B>>,
656    S::Future: 'static,
657    S::Error: Into<Box<dyn StdError + Send + Sync>>,
658    B: Body + 'static,
659    B::Error: Into<Box<dyn StdError + Send + Sync>>,
660    I: Read + Write + Unpin + Send + 'static,
661    E: HttpServerConnExec<S::Future, B>,
662{
663    type Output = Result<()>;
664
665    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
666        loop {
667            let mut this = self.as_mut().project();
668
669            match this.state.as_mut().project() {
670                UpgradeableConnStateProj::ReadVersion {
671                    read_version,
672                    builder,
673                    service,
674                } => {
675                    let (version, io) = ready!(read_version.poll(cx))?;
676                    let service = service.take().unwrap();
677                    match version {
678                        #[cfg(feature = "http1")]
679                        Version::H1 => {
680                            let conn = builder.http1.serve_connection(io, service).with_upgrades();
681                            this.state.set(UpgradeableConnState::H1 { conn });
682                        }
683                        #[cfg(feature = "http2")]
684                        Version::H2 => {
685                            let conn = builder.http2.serve_connection(io, service);
686                            this.state.set(UpgradeableConnState::H2 { conn });
687                        }
688                        #[cfg(any(not(feature = "http1"), not(feature = "http2")))]
689                        _ => return Poll::Ready(Err(version.unsupported())),
690                    }
691                }
692                #[cfg(feature = "http1")]
693                UpgradeableConnStateProj::H1 { conn } => {
694                    return conn.poll(cx).map_err(Into::into);
695                }
696                #[cfg(feature = "http2")]
697                UpgradeableConnStateProj::H2 { conn } => {
698                    return conn.poll(cx).map_err(Into::into);
699                }
700                #[cfg(any(not(feature = "http1"), not(feature = "http2")))]
701                _ => unreachable!(),
702            }
703        }
704    }
705}
706
707/// Http1 part of builder.
708#[cfg(feature = "http1")]
709pub struct Http1Builder<'a, E> {
710    inner: &'a mut Builder<E>,
711}
712
713#[cfg(feature = "http1")]
714impl<E> Http1Builder<'_, E> {
715    /// Http2 configuration.
716    #[cfg(feature = "http2")]
717    pub fn http2(&mut self) -> Http2Builder<'_, E> {
718        Http2Builder { inner: self.inner }
719    }
720
721    /// Set whether the `date` header should be included in HTTP responses.
722    ///
723    /// Note that including the `date` header is recommended by RFC 7231.
724    ///
725    /// Default is true.
726    pub fn auto_date_header(&mut self, enabled: bool) -> &mut Self {
727        self.inner.http1.auto_date_header(enabled);
728        self
729    }
730
731    /// Set whether HTTP/1 connections should support half-closures.
732    ///
733    /// Clients can chose to shutdown their write-side while waiting
734    /// for the server to respond. Setting this to `true` will
735    /// prevent closing the connection immediately if `read`
736    /// detects an EOF in the middle of a request.
737    ///
738    /// Default is `false`.
739    pub fn half_close(&mut self, val: bool) -> &mut Self {
740        self.inner.http1.half_close(val);
741        self
742    }
743
744    /// Enables or disables HTTP/1 keep-alive.
745    ///
746    /// Default is true.
747    pub fn keep_alive(&mut self, val: bool) -> &mut Self {
748        self.inner.http1.keep_alive(val);
749        self
750    }
751
752    /// Set whether HTTP/1 connections will write header names as title case at
753    /// the socket level.
754    ///
755    /// Note that this setting does not affect HTTP/2.
756    ///
757    /// Default is false.
758    pub fn title_case_headers(&mut self, enabled: bool) -> &mut Self {
759        self.inner.http1.title_case_headers(enabled);
760        self
761    }
762
763    /// Set whether HTTP/1 connections will silently ignored malformed header lines.
764    ///
765    /// If this is enabled and a header line does not start with a valid header
766    /// name, or does not include a colon at all, the line will be silently ignored
767    /// and no error will be reported.
768    ///
769    /// Default is false.
770    pub fn ignore_invalid_headers(&mut self, enabled: bool) -> &mut Self {
771        self.inner.http1.ignore_invalid_headers(enabled);
772        self
773    }
774
775    /// Set whether to support preserving original header cases.
776    ///
777    /// Currently, this will record the original cases received, and store them
778    /// in a private extension on the `Request`. It will also look for and use
779    /// such an extension in any provided `Response`.
780    ///
781    /// Since the relevant extension is still private, there is no way to
782    /// interact with the original cases. The only effect this can have now is
783    /// to forward the cases in a proxy-like fashion.
784    ///
785    /// Note that this setting does not affect HTTP/2.
786    ///
787    /// Default is false.
788    pub fn preserve_header_case(&mut self, enabled: bool) -> &mut Self {
789        self.inner.http1.preserve_header_case(enabled);
790        self
791    }
792
793    /// Set the maximum number of headers.
794    ///
795    /// When a request is received, the parser will reserve a buffer to store headers for optimal
796    /// performance.
797    ///
798    /// If server receives more headers than the buffer size, it responds to the client with
799    /// "431 Request Header Fields Too Large".
800    ///
801    /// The headers is allocated on the stack by default, which has higher performance. After
802    /// setting this value, headers will be allocated in heap memory, that is, heap memory
803    /// allocation will occur for each request, and there will be a performance drop of about 5%.
804    ///
805    /// Note that this setting does not affect HTTP/2.
806    ///
807    /// Default is 100.
808    pub fn max_headers(&mut self, val: usize) -> &mut Self {
809        self.inner.http1.max_headers(val);
810        self
811    }
812
813    /// Set a timeout for reading client request headers. If a client does not
814    /// transmit the entire header within this time, the connection is closed.
815    ///
816    /// Requires a [`Timer`] set by [`Http1Builder::timer`] to take effect. Panics if `header_read_timeout` is configured
817    /// without a [`Timer`].
818    ///
819    /// Pass `None` to disable.
820    ///
821    /// Default is currently 30 seconds, but do not depend on that.
822    pub fn header_read_timeout(&mut self, read_timeout: impl Into<Option<Duration>>) -> &mut Self {
823        self.inner.http1.header_read_timeout(read_timeout);
824        self
825    }
826
827    /// Set whether HTTP/1 connections should try to use vectored writes,
828    /// or always flatten into a single buffer.
829    ///
830    /// Note that setting this to false may mean more copies of body data,
831    /// but may also improve performance when an IO transport doesn't
832    /// support vectored writes well, such as most TLS implementations.
833    ///
834    /// Setting this to true will force hyper to use queued strategy
835    /// which may eliminate unnecessary cloning on some TLS backends
836    ///
837    /// Default is `auto`. In this mode hyper will try to guess which
838    /// mode to use
839    pub fn writev(&mut self, val: bool) -> &mut Self {
840        self.inner.http1.writev(val);
841        self
842    }
843
844    /// Set the maximum buffer size for the connection.
845    ///
846    /// Default is ~400kb.
847    ///
848    /// # Panics
849    ///
850    /// The minimum value allowed is 8192. This method panics if the passed `max` is less than the minimum.
851    pub fn max_buf_size(&mut self, max: usize) -> &mut Self {
852        self.inner.http1.max_buf_size(max);
853        self
854    }
855
856    /// Aggregates flushes to better support pipelined responses.
857    ///
858    /// Experimental, may have bugs.
859    ///
860    /// Default is false.
861    pub fn pipeline_flush(&mut self, enabled: bool) -> &mut Self {
862        self.inner.http1.pipeline_flush(enabled);
863        self
864    }
865
866    /// Set the timer used in background tasks.
867    pub fn timer<M>(&mut self, timer: M) -> &mut Self
868    where
869        M: Timer + Send + Sync + 'static,
870    {
871        self.inner.http1.timer(timer);
872        self
873    }
874
875    /// Bind a connection together with a [`Service`].
876    #[cfg(feature = "http2")]
877    pub async fn serve_connection<I, S, B>(&self, io: I, service: S) -> Result<()>
878    where
879        S: Service<Request<Incoming>, Response = Response<B>>,
880        S::Future: 'static,
881        S::Error: Into<Box<dyn StdError + Send + Sync>>,
882        B: Body + 'static,
883        B::Error: Into<Box<dyn StdError + Send + Sync>>,
884        I: Read + Write + Unpin + 'static,
885        E: HttpServerConnExec<S::Future, B>,
886    {
887        self.inner.serve_connection(io, service).await
888    }
889
890    /// Bind a connection together with a [`Service`].
891    #[cfg(not(feature = "http2"))]
892    pub async fn serve_connection<I, S, B>(&self, io: I, service: S) -> Result<()>
893    where
894        S: Service<Request<Incoming>, Response = Response<B>>,
895        S::Future: 'static,
896        S::Error: Into<Box<dyn StdError + Send + Sync>>,
897        B: Body + 'static,
898        B::Error: Into<Box<dyn StdError + Send + Sync>>,
899        I: Read + Write + Unpin + 'static,
900    {
901        self.inner.serve_connection(io, service).await
902    }
903
904    /// Bind a connection together with a [`Service`], with the ability to
905    /// handle HTTP upgrades. This requires that the IO object implements
906    /// `Send`.
907    #[cfg(feature = "http2")]
908    pub fn serve_connection_with_upgrades<I, S, B>(
909        &self,
910        io: I,
911        service: S,
912    ) -> UpgradeableConnection<'_, I, S, E>
913    where
914        S: Service<Request<Incoming>, Response = Response<B>>,
915        S::Future: 'static,
916        S::Error: Into<Box<dyn StdError + Send + Sync>>,
917        B: Body + 'static,
918        B::Error: Into<Box<dyn StdError + Send + Sync>>,
919        I: Read + Write + Unpin + Send + 'static,
920        E: HttpServerConnExec<S::Future, B>,
921    {
922        self.inner.serve_connection_with_upgrades(io, service)
923    }
924}
925
926/// Http2 part of builder.
927#[cfg(feature = "http2")]
928pub struct Http2Builder<'a, E> {
929    inner: &'a mut Builder<E>,
930}
931
932#[cfg(feature = "http2")]
933impl<E> Http2Builder<'_, E> {
934    #[cfg(feature = "http1")]
935    /// Http1 configuration.
936    pub fn http1(&mut self) -> Http1Builder<'_, E> {
937        Http1Builder { inner: self.inner }
938    }
939
940    /// Configures the maximum number of pending reset streams allowed before a GOAWAY will be sent.
941    ///
942    /// This will default to the default value set by the [`h2` crate](https://crates.io/crates/h2).
943    /// As of v0.4.0, it is 20.
944    ///
945    /// See <https://github.com/hyperium/hyper/issues/2877> for more information.
946    pub fn max_pending_accept_reset_streams(&mut self, max: impl Into<Option<usize>>) -> &mut Self {
947        self.inner.http2.max_pending_accept_reset_streams(max);
948        self
949    }
950
951    /// Configures the maximum number of local reset streams allowed before a GOAWAY will be sent.
952    ///
953    /// If not set, hyper will use a default, currently of 1024.
954    ///
955    /// If `None` is supplied, hyper will not apply any limit.
956    /// This is not advised, as it can potentially expose servers to DOS vulnerabilities.
957    ///
958    /// See <https://rustsec.org/advisories/RUSTSEC-2024-0003.html> for more information.
959    pub fn max_local_error_reset_streams(&mut self, max: impl Into<Option<usize>>) -> &mut Self {
960        self.inner.http2.max_local_error_reset_streams(max);
961        self
962    }
963
964    /// Sets the [`SETTINGS_INITIAL_WINDOW_SIZE`][spec] option for HTTP2
965    /// stream-level flow control.
966    ///
967    /// Passing `None` will do nothing.
968    ///
969    /// If not set, hyper will use a default.
970    ///
971    /// [spec]: https://http2.github.io/http2-spec/#SETTINGS_INITIAL_WINDOW_SIZE
972    pub fn initial_stream_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
973        self.inner.http2.initial_stream_window_size(sz);
974        self
975    }
976
977    /// Sets the max connection-level flow control for HTTP2.
978    ///
979    /// Passing `None` will do nothing.
980    ///
981    /// If not set, hyper will use a default.
982    pub fn initial_connection_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
983        self.inner.http2.initial_connection_window_size(sz);
984        self
985    }
986
987    /// Sets whether to use an adaptive flow control.
988    ///
989    /// Enabling this will override the limits set in
990    /// `http2_initial_stream_window_size` and
991    /// `http2_initial_connection_window_size`.
992    pub fn adaptive_window(&mut self, enabled: bool) -> &mut Self {
993        self.inner.http2.adaptive_window(enabled);
994        self
995    }
996
997    /// Sets the maximum frame size to use for HTTP2.
998    ///
999    /// Passing `None` will do nothing.
1000    ///
1001    /// If not set, hyper will use a default.
1002    pub fn max_frame_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
1003        self.inner.http2.max_frame_size(sz);
1004        self
1005    }
1006
1007    /// Sets the [`SETTINGS_MAX_CONCURRENT_STREAMS`][spec] option for HTTP2
1008    /// connections.
1009    ///
1010    /// Default is 200. Passing `None` will remove any limit.
1011    ///
1012    /// [spec]: https://http2.github.io/http2-spec/#SETTINGS_MAX_CONCURRENT_STREAMS
1013    pub fn max_concurrent_streams(&mut self, max: impl Into<Option<u32>>) -> &mut Self {
1014        self.inner.http2.max_concurrent_streams(max);
1015        self
1016    }
1017
1018    /// Sets an interval for HTTP2 Ping frames should be sent to keep a
1019    /// connection alive.
1020    ///
1021    /// Pass `None` to disable HTTP2 keep-alive.
1022    ///
1023    /// Default is currently disabled.
1024    ///
1025    /// # Cargo Feature
1026    ///
1027    pub fn keep_alive_interval(&mut self, interval: impl Into<Option<Duration>>) -> &mut Self {
1028        self.inner.http2.keep_alive_interval(interval);
1029        self
1030    }
1031
1032    /// Sets a timeout for receiving an acknowledgement of the keep-alive ping.
1033    ///
1034    /// If the ping is not acknowledged within the timeout, the connection will
1035    /// be closed. Does nothing if `http2_keep_alive_interval` is disabled.
1036    ///
1037    /// Default is 20 seconds.
1038    ///
1039    /// # Cargo Feature
1040    ///
1041    pub fn keep_alive_timeout(&mut self, timeout: Duration) -> &mut Self {
1042        self.inner.http2.keep_alive_timeout(timeout);
1043        self
1044    }
1045
1046    /// Set the maximum write buffer size for each HTTP/2 stream.
1047    ///
1048    /// Default is currently ~400KB, but may change.
1049    ///
1050    /// # Panics
1051    ///
1052    /// The value must be no larger than `u32::MAX`.
1053    pub fn max_send_buf_size(&mut self, max: usize) -> &mut Self {
1054        self.inner.http2.max_send_buf_size(max);
1055        self
1056    }
1057
1058    /// Enables the [extended CONNECT protocol].
1059    ///
1060    /// [extended CONNECT protocol]: https://datatracker.ietf.org/doc/html/rfc8441#section-4
1061    pub fn enable_connect_protocol(&mut self) -> &mut Self {
1062        self.inner.http2.enable_connect_protocol();
1063        self
1064    }
1065
1066    /// Sets the max size of received header frames.
1067    ///
1068    /// Default is currently ~16MB, but may change.
1069    pub fn max_header_list_size(&mut self, max: u32) -> &mut Self {
1070        self.inner.http2.max_header_list_size(max);
1071        self
1072    }
1073
1074    /// Set the timer used in background tasks.
1075    pub fn timer<M>(&mut self, timer: M) -> &mut Self
1076    where
1077        M: Timer + Send + Sync + 'static,
1078    {
1079        self.inner.http2.timer(timer);
1080        self
1081    }
1082
1083    /// Set whether the `date` header should be included in HTTP responses.
1084    ///
1085    /// Note that including the `date` header is recommended by RFC 7231.
1086    ///
1087    /// Default is true.
1088    pub fn auto_date_header(&mut self, enabled: bool) -> &mut Self {
1089        self.inner.http2.auto_date_header(enabled);
1090        self
1091    }
1092
1093    /// Bind a connection together with a [`Service`].
1094    pub async fn serve_connection<I, S, B>(&self, io: I, service: S) -> Result<()>
1095    where
1096        S: Service<Request<Incoming>, Response = Response<B>>,
1097        S::Future: 'static,
1098        S::Error: Into<Box<dyn StdError + Send + Sync>>,
1099        B: Body + 'static,
1100        B::Error: Into<Box<dyn StdError + Send + Sync>>,
1101        I: Read + Write + Unpin + 'static,
1102        E: HttpServerConnExec<S::Future, B>,
1103    {
1104        self.inner.serve_connection(io, service).await
1105    }
1106
1107    /// Bind a connection together with a [`Service`], with the ability to
1108    /// handle HTTP upgrades. This requires that the IO object implements
1109    /// `Send`.
1110    pub fn serve_connection_with_upgrades<I, S, B>(
1111        &self,
1112        io: I,
1113        service: S,
1114    ) -> UpgradeableConnection<'_, I, S, E>
1115    where
1116        S: Service<Request<Incoming>, Response = Response<B>>,
1117        S::Future: 'static,
1118        S::Error: Into<Box<dyn StdError + Send + Sync>>,
1119        B: Body + 'static,
1120        B::Error: Into<Box<dyn StdError + Send + Sync>>,
1121        I: Read + Write + Unpin + Send + 'static,
1122        E: HttpServerConnExec<S::Future, B>,
1123    {
1124        self.inner.serve_connection_with_upgrades(io, service)
1125    }
1126}
1127
1128#[cfg(test)]
1129mod tests {
1130    use crate::{
1131        rt::{TokioExecutor, TokioIo},
1132        server::conn::auto,
1133    };
1134    use http::{Request, Response};
1135    use http_body::Body;
1136    use http_body_util::{BodyExt, Empty, Full};
1137    use hyper::{body, body::Bytes, client, service::service_fn};
1138    use std::{convert::Infallible, error::Error as StdError, net::SocketAddr, time::Duration};
1139    use tokio::{
1140        net::{TcpListener, TcpStream},
1141        pin,
1142    };
1143
1144    const BODY: &[u8] = b"Hello, world!";
1145
1146    #[test]
1147    fn configuration() {
1148        // One liner.
1149        auto::Builder::new(TokioExecutor::new())
1150            .http1()
1151            .keep_alive(true)
1152            .http2()
1153            .keep_alive_interval(None);
1154        //  .serve_connection(io, service);
1155
1156        // Using variable.
1157        let mut builder = auto::Builder::new(TokioExecutor::new());
1158
1159        builder.http1().keep_alive(true);
1160        builder.http2().keep_alive_interval(None);
1161        // builder.serve_connection(io, service);
1162    }
1163
1164    #[test]
1165    #[cfg(feature = "http1")]
1166    fn title_case_headers_configuration() {
1167        // Test title_case_headers can be set on the main builder
1168        auto::Builder::new(TokioExecutor::new()).title_case_headers(true);
1169
1170        // Can be combined with other configuration
1171        auto::Builder::new(TokioExecutor::new())
1172            .title_case_headers(true)
1173            .http1_only();
1174    }
1175
1176    #[test]
1177    #[cfg(feature = "http1")]
1178    fn preserve_header_case_configuration() {
1179        // Test preserve_header_case can be set on the main builder
1180        auto::Builder::new(TokioExecutor::new()).preserve_header_case(true);
1181
1182        // Can be combined with other configuration
1183        auto::Builder::new(TokioExecutor::new())
1184            .preserve_header_case(true)
1185            .http1_only();
1186    }
1187
1188    #[cfg(not(miri))]
1189    #[tokio::test]
1190    async fn http1() {
1191        let addr = start_server(false, false).await;
1192        let mut sender = connect_h1(addr).await;
1193
1194        let response = sender
1195            .send_request(Request::new(Empty::<Bytes>::new()))
1196            .await
1197            .unwrap();
1198
1199        let body = response.into_body().collect().await.unwrap().to_bytes();
1200
1201        assert_eq!(body, BODY);
1202    }
1203
1204    #[cfg(not(miri))]
1205    #[tokio::test]
1206    async fn http2() {
1207        let addr = start_server(false, false).await;
1208        let mut sender = connect_h2(addr).await;
1209
1210        let response = sender
1211            .send_request(Request::new(Empty::<Bytes>::new()))
1212            .await
1213            .unwrap();
1214
1215        let body = response.into_body().collect().await.unwrap().to_bytes();
1216
1217        assert_eq!(body, BODY);
1218    }
1219
1220    #[cfg(not(miri))]
1221    #[tokio::test]
1222    async fn http2_only() {
1223        let addr = start_server(false, true).await;
1224        let mut sender = connect_h2(addr).await;
1225
1226        let response = sender
1227            .send_request(Request::new(Empty::<Bytes>::new()))
1228            .await
1229            .unwrap();
1230
1231        let body = response.into_body().collect().await.unwrap().to_bytes();
1232
1233        assert_eq!(body, BODY);
1234    }
1235
1236    #[cfg(not(miri))]
1237    #[tokio::test]
1238    async fn http2_only_fail_if_client_is_http1() {
1239        let addr = start_server(false, true).await;
1240        let mut sender = connect_h1(addr).await;
1241
1242        let _ = sender
1243            .send_request(Request::new(Empty::<Bytes>::new()))
1244            .await
1245            .expect_err("should fail");
1246    }
1247
1248    #[cfg(not(miri))]
1249    #[tokio::test]
1250    async fn http1_only() {
1251        let addr = start_server(true, false).await;
1252        let mut sender = connect_h1(addr).await;
1253
1254        let response = sender
1255            .send_request(Request::new(Empty::<Bytes>::new()))
1256            .await
1257            .unwrap();
1258
1259        let body = response.into_body().collect().await.unwrap().to_bytes();
1260
1261        assert_eq!(body, BODY);
1262    }
1263
1264    #[cfg(not(miri))]
1265    #[tokio::test]
1266    async fn http1_only_fail_if_client_is_http2() {
1267        let addr = start_server(true, false).await;
1268        let mut sender = connect_h2(addr).await;
1269
1270        let _ = sender
1271            .send_request(Request::new(Empty::<Bytes>::new()))
1272            .await
1273            .expect_err("should fail");
1274    }
1275
1276    #[cfg(not(miri))]
1277    #[tokio::test]
1278    async fn graceful_shutdown() {
1279        let listener = TcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 0)))
1280            .await
1281            .unwrap();
1282
1283        let listener_addr = listener.local_addr().unwrap();
1284
1285        // Spawn the task in background so that we can connect there
1286        let listen_task = tokio::spawn(async move { listener.accept().await.unwrap() });
1287        // Only connect a stream, do not send headers or anything
1288        let _stream = TcpStream::connect(listener_addr).await.unwrap();
1289
1290        let (stream, _) = listen_task.await.unwrap();
1291        let stream = TokioIo::new(stream);
1292        let builder = auto::Builder::new(TokioExecutor::new());
1293        let connection = builder.serve_connection(stream, service_fn(hello));
1294
1295        pin!(connection);
1296
1297        connection.as_mut().graceful_shutdown();
1298
1299        let connection_error = tokio::time::timeout(Duration::from_millis(200), connection)
1300            .await
1301            .expect("Connection should have finished in a timely manner after graceful shutdown.")
1302            .expect_err("Connection should have been interrupted.");
1303
1304        let connection_error = connection_error
1305            .downcast_ref::<std::io::Error>()
1306            .expect("The error should have been `std::io::Error`.");
1307        assert_eq!(connection_error.kind(), std::io::ErrorKind::Interrupted);
1308    }
1309
1310    async fn connect_h1<B>(addr: SocketAddr) -> client::conn::http1::SendRequest<B>
1311    where
1312        B: Body + Send + 'static,
1313        B::Data: Send,
1314        B::Error: Into<Box<dyn StdError + Send + Sync>>,
1315    {
1316        let stream = TokioIo::new(TcpStream::connect(addr).await.unwrap());
1317        let (sender, connection) = client::conn::http1::handshake(stream).await.unwrap();
1318
1319        tokio::spawn(connection);
1320
1321        sender
1322    }
1323
1324    async fn connect_h2<B>(addr: SocketAddr) -> client::conn::http2::SendRequest<B>
1325    where
1326        B: Body + Unpin + Send + 'static,
1327        B::Data: Send,
1328        B::Error: Into<Box<dyn StdError + Send + Sync>>,
1329    {
1330        let stream = TokioIo::new(TcpStream::connect(addr).await.unwrap());
1331        let (sender, connection) = client::conn::http2::Builder::new(TokioExecutor::new())
1332            .handshake(stream)
1333            .await
1334            .unwrap();
1335
1336        tokio::spawn(connection);
1337
1338        sender
1339    }
1340
1341    async fn start_server(h1_only: bool, h2_only: bool) -> SocketAddr {
1342        let addr: SocketAddr = ([127, 0, 0, 1], 0).into();
1343        let listener = TcpListener::bind(addr).await.unwrap();
1344
1345        let local_addr = listener.local_addr().unwrap();
1346
1347        tokio::spawn(async move {
1348            loop {
1349                let (stream, _) = listener.accept().await.unwrap();
1350                let stream = TokioIo::new(stream);
1351                tokio::task::spawn(async move {
1352                    let mut builder = auto::Builder::new(TokioExecutor::new());
1353                    if h1_only {
1354                        builder = builder.http1_only();
1355                        builder.serve_connection(stream, service_fn(hello)).await
1356                    } else if h2_only {
1357                        builder = builder.http2_only();
1358                        builder.serve_connection(stream, service_fn(hello)).await
1359                    } else {
1360                        builder
1361                            .http2()
1362                            .max_header_list_size(4096)
1363                            .serve_connection_with_upgrades(stream, service_fn(hello))
1364                            .await
1365                    }
1366                    .unwrap();
1367                });
1368            }
1369        });
1370
1371        local_addr
1372    }
1373
1374    async fn hello(_req: Request<body::Incoming>) -> Result<Response<Full<Bytes>>, Infallible> {
1375        Ok(Response::new(Full::new(Bytes::from(BODY))))
1376    }
1377}