hyper/client/
conn.rs

1//! Lower-level client connection API.
2//!
3//! The types in this module are to provide a lower-level API based around a
4//! single connection. Connecting to a host, pooling connections, and the like
5//! are not handled at this level. This module provides the building blocks to
6//! customize those things externally.
7//!
8//! If don't have need to manage connections yourself, consider using the
9//! higher-level [Client](super) API.
10//!
11//! ## Example
12//! A simple example that uses the `SendRequest` struct to talk HTTP over a Tokio TCP stream
13//! ```no_run
14//! # #[cfg(all(feature = "client", feature = "http1", feature = "runtime"))]
15//! # mod rt {
16//! use tower::ServiceExt;
17//! use http::{Request, StatusCode};
18//! use hyper::{client::conn, Body};
19//! use tokio::net::TcpStream;
20//!
21//! #[tokio::main]
22//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
23//!     let target_stream = TcpStream::connect("example.com:80").await?;
24//!
25//!     let (mut request_sender, connection) = conn::handshake(target_stream).await?;
26//!
27//!     // spawn a task to poll the connection and drive the HTTP state
28//!     tokio::spawn(async move {
29//!         if let Err(e) = connection.await {
30//!             eprintln!("Error in connection: {}", e);
31//!         }
32//!     });
33//!
34//!     let request = Request::builder()
35//!         // We need to manually add the host header because SendRequest does not
36//!         .header("Host", "example.com")
37//!         .method("GET")
38//!         .body(Body::from(""))?;
39//!     let response = request_sender.send_request(request).await?;
40//!     assert!(response.status() == StatusCode::OK);
41//!
42//!     // To send via the same connection again, it may not work as it may not be ready,
43//!     // so we have to wait until the request_sender becomes ready.
44//!     request_sender.ready().await?;
45//!     let request = Request::builder()
46//!         .header("Host", "example.com")
47//!         .method("GET")
48//!         .body(Body::from(""))?;
49//!     let response = request_sender.send_request(request).await?;
50//!     assert!(response.status() == StatusCode::OK);
51//!     Ok(())
52//! }
53//!
54//! # }
55//! ```
56
57#[cfg(all(feature = "backports", feature = "http1"))]
58pub mod http1;
59#[cfg(all(feature = "backports", feature = "http2"))]
60pub mod http2;
61
62#[cfg(not(all(feature = "http1", feature = "http2")))]
63use std::convert::Infallible;
64use std::error::Error as StdError;
65use std::fmt;
66use std::future::Future;
67#[cfg(not(all(feature = "http1", feature = "http2")))]
68use std::marker::PhantomData;
69use std::marker::Unpin;
70use std::pin::Pin;
71use std::sync::Arc;
72use std::task::{Context, Poll};
73#[cfg(all(feature = "runtime", feature = "http2"))]
74use std::time::Duration;
75
76use bytes::Bytes;
77use futures_util::future::{self, Either, FutureExt as _};
78use httparse::ParserConfig;
79use pin_project_lite::pin_project;
80use tokio::io::{AsyncRead, AsyncWrite};
81use tower_service::Service;
82use tracing::{debug, trace};
83
84use super::dispatch;
85use crate::body::HttpBody;
86use crate::common::exec::{BoxSendFuture, Exec};
87use crate::proto;
88use crate::rt::Executor;
89#[cfg(feature = "http1")]
90use crate::upgrade::Upgraded;
91use crate::{Body, Request, Response};
92
93#[cfg(feature = "http1")]
94type Http1Dispatcher<T, B> =
95    proto::dispatch::Dispatcher<proto::dispatch::Client<B>, B, T, proto::h1::ClientTransaction>;
96
97#[cfg(not(feature = "http1"))]
98type Http1Dispatcher<T, B> = (Infallible, PhantomData<(T, Pin<Box<B>>)>);
99
100#[cfg(feature = "http2")]
101type Http2ClientTask<B> = proto::h2::ClientTask<B>;
102
103#[cfg(not(feature = "http2"))]
104type Http2ClientTask<B> = (Infallible, PhantomData<Pin<Box<B>>>);
105
106pin_project! {
107    #[project = ProtoClientProj]
108    enum ProtoClient<T, B>
109    where
110        B: HttpBody,
111    {
112        H1 {
113            #[pin]
114            h1: Http1Dispatcher<T, B>,
115        },
116        H2 {
117            #[pin]
118            h2: Http2ClientTask<B>,
119        },
120    }
121}
122
123/// Returns a handshake future over some IO.
124///
125/// This is a shortcut for `Builder::new().handshake(io)`.
126/// See [`client::conn`](crate::client::conn) for more.
127#[cfg_attr(
128    feature = "deprecated",
129    deprecated(
130        note = "This function will be replaced with `client::conn::http1::handshake` and `client::conn::http2::handshake` in 1.0, enable the \"backports\" feature to use them now."
131    )
132)]
133#[cfg_attr(feature = "deprecated", allow(deprecated))]
134pub async fn handshake<T>(
135    io: T,
136) -> crate::Result<(SendRequest<crate::Body>, Connection<T, crate::Body>)>
137where
138    T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
139{
140    #[allow(deprecated)]
141    Builder::new().handshake(io).await
142}
143
144/// The sender side of an established connection.
145#[cfg_attr(
146    feature = "deprecated",
147    deprecated(
148        note = "This type will be replaced with `client::conn::http1::SendRequest` and `client::conn::http2::SendRequest` in 1.0, enable the \"backports\" feature to use them now."
149    )
150)]
151pub struct SendRequest<B> {
152    dispatch: dispatch::Sender<Request<B>, Response<Body>>,
153}
154
155/// A future that processes all HTTP state for the IO object.
156///
157/// In most cases, this should just be spawned into an executor, so that it
158/// can process incoming and outgoing messages, notice hangups, and the like.
159#[must_use = "futures do nothing unless polled"]
160#[cfg_attr(
161    feature = "deprecated",
162    deprecated(
163        note = "This type will be replaced with `client::conn::http1::Connection` and `client::conn::http2::Connection` in 1.0, enable the \"backports\" feature to use them now."
164    )
165)]
166pub struct Connection<T, B>
167where
168    T: AsyncRead + AsyncWrite + Send + 'static,
169    B: HttpBody + 'static,
170{
171    inner: Option<ProtoClient<T, B>>,
172}
173
174/// A builder to configure an HTTP connection.
175///
176/// After setting options, the builder is used to create a handshake future.
177#[derive(Clone, Debug)]
178#[cfg_attr(
179    feature = "deprecated",
180    deprecated(
181        note = "This type will be replaced with `client::conn::http1::Builder` and `client::conn::http2::Builder` in 1.0, enable the \"backports\" feature to use them now."
182    )
183)]
184pub struct Builder {
185    pub(super) exec: Exec,
186    h09_responses: bool,
187    h1_parser_config: ParserConfig,
188    h1_writev: Option<bool>,
189    h1_title_case_headers: bool,
190    h1_preserve_header_case: bool,
191    #[cfg(feature = "ffi")]
192    h1_preserve_header_order: bool,
193    h1_read_buf_exact_size: Option<usize>,
194    h1_max_buf_size: Option<usize>,
195    #[cfg(feature = "ffi")]
196    h1_headers_raw: bool,
197    #[cfg(feature = "http2")]
198    h2_builder: proto::h2::client::Config,
199    version: Proto,
200}
201
202#[derive(Clone, Debug)]
203enum Proto {
204    #[cfg(feature = "http1")]
205    Http1,
206    #[cfg(feature = "http2")]
207    Http2,
208}
209
210/// A future returned by `SendRequest::send_request`.
211///
212/// Yields a `Response` if successful.
213#[must_use = "futures do nothing unless polled"]
214pub struct ResponseFuture {
215    inner: ResponseFutureState,
216}
217
218enum ResponseFutureState {
219    Waiting(dispatch::Promise<Response<Body>>),
220    // Option is to be able to `take()` it in `poll`
221    Error(Option<crate::Error>),
222}
223
224/// Deconstructed parts of a `Connection`.
225///
226/// This allows taking apart a `Connection` at a later time, in order to
227/// reclaim the IO object, and additional related pieces.
228#[derive(Debug)]
229pub struct Parts<T> {
230    /// The original IO object used in the handshake.
231    pub io: T,
232    /// A buffer of bytes that have been read but not processed as HTTP.
233    ///
234    /// For instance, if the `Connection` is used for an HTTP upgrade request,
235    /// it is possible the server sent back the first bytes of the new protocol
236    /// along with the response upgrade.
237    ///
238    /// You will want to check for any existing bytes if you plan to continue
239    /// communicating on the IO object.
240    pub read_buf: Bytes,
241    _inner: (),
242}
243
244// ========== internal client api
245
246// A `SendRequest` that can be cloned to send HTTP2 requests.
247// private for now, probably not a great idea of a type...
248#[must_use = "futures do nothing unless polled"]
249#[cfg(feature = "http2")]
250pub(super) struct Http2SendRequest<B> {
251    dispatch: dispatch::UnboundedSender<Request<B>, Response<Body>>,
252}
253
254// ===== impl SendRequest
255
256#[cfg_attr(feature = "deprecated", allow(deprecated))]
257impl<B> SendRequest<B> {
258    /// Polls to determine whether this sender can be used yet for a request.
259    ///
260    /// If the associated connection is closed, this returns an Error.
261    pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
262        self.dispatch.poll_ready(cx)
263    }
264
265    pub(super) async fn when_ready(self) -> crate::Result<Self> {
266        let mut me = Some(self);
267        future::poll_fn(move |cx| {
268            ready!(me.as_mut().unwrap().poll_ready(cx))?;
269            Poll::Ready(Ok(me.take().unwrap()))
270        })
271        .await
272    }
273
274    pub(super) fn is_ready(&self) -> bool {
275        self.dispatch.is_ready()
276    }
277
278    pub(super) fn is_closed(&self) -> bool {
279        self.dispatch.is_closed()
280    }
281
282    #[cfg(feature = "http2")]
283    pub(super) fn into_http2(self) -> Http2SendRequest<B> {
284        Http2SendRequest {
285            dispatch: self.dispatch.unbound(),
286        }
287    }
288}
289
290#[cfg_attr(feature = "deprecated", allow(deprecated))]
291impl<B> SendRequest<B>
292where
293    B: HttpBody + 'static,
294{
295    /// Sends a `Request` on the associated connection.
296    ///
297    /// Returns a future that if successful, yields the `Response`.
298    ///
299    /// # Note
300    ///
301    /// There are some key differences in what automatic things the `Client`
302    /// does for you that will not be done here:
303    ///
304    /// - `Client` requires absolute-form `Uri`s, since the scheme and
305    ///   authority are needed to connect. They aren't required here.
306    /// - Since the `Client` requires absolute-form `Uri`s, it can add
307    ///   the `Host` header based on it. You must add a `Host` header yourself
308    ///   before calling this method.
309    /// - Since absolute-form `Uri`s are not required, if received, they will
310    ///   be serialized as-is.
311    ///
312    /// # Example
313    ///
314    /// ```
315    /// # use http::header::HOST;
316    /// # use hyper::client::conn::SendRequest;
317    /// # use hyper::Body;
318    /// use hyper::Request;
319    ///
320    /// # async fn doc(mut tx: SendRequest<Body>) -> hyper::Result<()> {
321    /// // build a Request
322    /// let req = Request::builder()
323    ///     .uri("/foo/bar")
324    ///     .header(HOST, "hyper.rs")
325    ///     .body(Body::empty())
326    ///     .unwrap();
327    ///
328    /// // send it and await a Response
329    /// let res = tx.send_request(req).await?;
330    /// // assert the Response
331    /// assert!(res.status().is_success());
332    /// # Ok(())
333    /// # }
334    /// # fn main() {}
335    /// ```
336    pub fn send_request(&mut self, req: Request<B>) -> ResponseFuture {
337        let inner = match self.dispatch.send(req) {
338            Ok(rx) => ResponseFutureState::Waiting(rx),
339            Err(_req) => {
340                debug!("connection was not ready");
341                let err = crate::Error::new_canceled().with("connection was not ready");
342                ResponseFutureState::Error(Some(err))
343            }
344        };
345
346        ResponseFuture { inner }
347    }
348
349    pub(super) fn send_request_retryable(
350        &mut self,
351        req: Request<B>,
352    ) -> impl Future<Output = Result<Response<Body>, (crate::Error, Option<Request<B>>)>> + Unpin
353    where
354        B: Send,
355    {
356        match self.dispatch.try_send(req) {
357            Ok(rx) => {
358                Either::Left(rx.then(move |res| {
359                    match res {
360                        Ok(Ok(res)) => future::ok(res),
361                        Ok(Err(err)) => future::err(err),
362                        // this is definite bug if it happens, but it shouldn't happen!
363                        Err(_) => panic!("dispatch dropped without returning error"),
364                    }
365                }))
366            }
367            Err(req) => {
368                debug!("connection was not ready");
369                let err = crate::Error::new_canceled().with("connection was not ready");
370                Either::Right(future::err((err, Some(req))))
371            }
372        }
373    }
374}
375
376#[cfg_attr(feature = "deprecated", allow(deprecated))]
377impl<B> Service<Request<B>> for SendRequest<B>
378where
379    B: HttpBody + 'static,
380{
381    type Response = Response<Body>;
382    type Error = crate::Error;
383    type Future = ResponseFuture;
384
385    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
386        self.poll_ready(cx)
387    }
388
389    fn call(&mut self, req: Request<B>) -> Self::Future {
390        self.send_request(req)
391    }
392}
393
394#[cfg_attr(feature = "deprecated", allow(deprecated))]
395impl<B> fmt::Debug for SendRequest<B> {
396    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
397        f.debug_struct("SendRequest").finish()
398    }
399}
400
401// ===== impl Http2SendRequest
402
403#[cfg(feature = "http2")]
404impl<B> Http2SendRequest<B> {
405    pub(super) fn is_ready(&self) -> bool {
406        self.dispatch.is_ready()
407    }
408
409    pub(super) fn is_closed(&self) -> bool {
410        self.dispatch.is_closed()
411    }
412}
413
414#[cfg(feature = "http2")]
415impl<B> Http2SendRequest<B>
416where
417    B: HttpBody + 'static,
418{
419    pub(super) fn send_request_retryable(
420        &mut self,
421        req: Request<B>,
422    ) -> impl Future<Output = Result<Response<Body>, (crate::Error, Option<Request<B>>)>>
423    where
424        B: Send,
425    {
426        match self.dispatch.try_send(req) {
427            Ok(rx) => {
428                Either::Left(rx.then(move |res| {
429                    match res {
430                        Ok(Ok(res)) => future::ok(res),
431                        Ok(Err(err)) => future::err(err),
432                        // this is definite bug if it happens, but it shouldn't happen!
433                        Err(_) => panic!("dispatch dropped without returning error"),
434                    }
435                }))
436            }
437            Err(req) => {
438                debug!("connection was not ready");
439                let err = crate::Error::new_canceled().with("connection was not ready");
440                Either::Right(future::err((err, Some(req))))
441            }
442        }
443    }
444}
445
446#[cfg(feature = "http2")]
447impl<B> fmt::Debug for Http2SendRequest<B> {
448    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
449        f.debug_struct("Http2SendRequest").finish()
450    }
451}
452
453#[cfg(feature = "http2")]
454impl<B> Clone for Http2SendRequest<B> {
455    fn clone(&self) -> Self {
456        Http2SendRequest {
457            dispatch: self.dispatch.clone(),
458        }
459    }
460}
461
462// ===== impl Connection
463
464#[cfg_attr(feature = "deprecated", allow(deprecated))]
465impl<T, B> Connection<T, B>
466where
467    T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
468    B: HttpBody + Unpin + Send + 'static,
469    B::Data: Send,
470    B::Error: Into<Box<dyn StdError + Send + Sync>>,
471{
472    /// Return the inner IO object, and additional information.
473    ///
474    /// Only works for HTTP/1 connections. HTTP/2 connections will panic.
475    pub fn into_parts(self) -> Parts<T> {
476        match self.inner.expect("already upgraded") {
477            #[cfg(feature = "http1")]
478            ProtoClient::H1 { h1 } => {
479                let (io, read_buf, _) = h1.into_inner();
480                Parts {
481                    io,
482                    read_buf,
483                    _inner: (),
484                }
485            }
486            ProtoClient::H2 { .. } => {
487                panic!("http2 cannot into_inner");
488            }
489
490            #[cfg(not(feature = "http1"))]
491            ProtoClient::H1 { h1 } => match h1.0 {},
492        }
493    }
494
495    /// Poll the connection for completion, but without calling `shutdown`
496    /// on the underlying IO.
497    ///
498    /// This is useful to allow running a connection while doing an HTTP
499    /// upgrade. Once the upgrade is completed, the connection would be "done",
500    /// but it is not desired to actually shutdown the IO object. Instead you
501    /// would take it back using `into_parts`.
502    ///
503    /// Use [`poll_fn`](https://docs.rs/futures/0.1.25/futures/future/fn.poll_fn.html)
504    /// and [`try_ready!`](https://docs.rs/futures/0.1.25/futures/macro.try_ready.html)
505    /// to work with this function; or use the `without_shutdown` wrapper.
506    pub fn poll_without_shutdown(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
507        match *self.inner.as_mut().expect("already upgraded") {
508            #[cfg(feature = "http1")]
509            ProtoClient::H1 { ref mut h1 } => h1.poll_without_shutdown(cx),
510            #[cfg(feature = "http2")]
511            ProtoClient::H2 { ref mut h2, .. } => Pin::new(h2).poll(cx).map_ok(|_| ()),
512
513            #[cfg(not(feature = "http1"))]
514            ProtoClient::H1 { ref mut h1 } => match h1.0 {},
515            #[cfg(not(feature = "http2"))]
516            ProtoClient::H2 { ref mut h2, .. } => match h2.0 {},
517        }
518    }
519
520    /// Prevent shutdown of the underlying IO object at the end of service the request,
521    /// instead run `into_parts`. This is a convenience wrapper over `poll_without_shutdown`.
522    pub fn without_shutdown(self) -> impl Future<Output = crate::Result<Parts<T>>> {
523        let mut conn = Some(self);
524        future::poll_fn(move |cx| -> Poll<crate::Result<Parts<T>>> {
525            ready!(conn.as_mut().unwrap().poll_without_shutdown(cx))?;
526            Poll::Ready(Ok(conn.take().unwrap().into_parts()))
527        })
528    }
529
530    /// Returns whether the [extended CONNECT protocol][1] is enabled or not.
531    ///
532    /// This setting is configured by the server peer by sending the
533    /// [`SETTINGS_ENABLE_CONNECT_PROTOCOL` parameter][2] in a `SETTINGS` frame.
534    /// This method returns the currently acknowledged value received from the
535    /// remote.
536    ///
537    /// [1]: https://datatracker.ietf.org/doc/html/rfc8441#section-4
538    /// [2]: https://datatracker.ietf.org/doc/html/rfc8441#section-3
539    #[cfg(feature = "http2")]
540    pub fn http2_is_extended_connect_protocol_enabled(&self) -> bool {
541        match self.inner.as_ref().unwrap() {
542            ProtoClient::H1 { .. } => false,
543            ProtoClient::H2 { h2 } => h2.is_extended_connect_protocol_enabled(),
544        }
545    }
546}
547
548#[cfg_attr(feature = "deprecated", allow(deprecated))]
549impl<T, B> Future for Connection<T, B>
550where
551    T: AsyncRead + AsyncWrite + Unpin + Send,
552    B: HttpBody + Send + 'static,
553    B::Data: Send,
554    B::Error: Into<Box<dyn StdError + Send + Sync>>,
555{
556    type Output = crate::Result<()>;
557
558    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
559        match ready!(Pin::new(self.inner.as_mut().unwrap()).poll(cx))? {
560            proto::Dispatched::Shutdown => Poll::Ready(Ok(())),
561            #[cfg(feature = "http1")]
562            proto::Dispatched::Upgrade(pending) => match self.inner.take() {
563                Some(ProtoClient::H1 { h1 }) => {
564                    let (io, buf, _) = h1.into_inner();
565                    pending.fulfill(Upgraded::new(io, buf));
566                    Poll::Ready(Ok(()))
567                }
568                _ => {
569                    drop(pending);
570                    unreachable!("Upgrade expects h1");
571                }
572            },
573        }
574    }
575}
576
577#[cfg_attr(feature = "deprecated", allow(deprecated))]
578impl<T, B> fmt::Debug for Connection<T, B>
579where
580    T: AsyncRead + AsyncWrite + fmt::Debug + Send + 'static,
581    B: HttpBody + 'static,
582{
583    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
584        f.debug_struct("Connection").finish()
585    }
586}
587
588// ===== impl Builder
589
590#[cfg_attr(feature = "deprecated", allow(deprecated))]
591impl Builder {
592    /// Creates a new connection builder.
593    #[inline]
594    pub fn new() -> Builder {
595        Builder {
596            exec: Exec::Default,
597            h09_responses: false,
598            h1_writev: None,
599            h1_read_buf_exact_size: None,
600            h1_parser_config: Default::default(),
601            h1_title_case_headers: false,
602            h1_preserve_header_case: false,
603            #[cfg(feature = "ffi")]
604            h1_preserve_header_order: false,
605            h1_max_buf_size: None,
606            #[cfg(feature = "ffi")]
607            h1_headers_raw: false,
608            #[cfg(feature = "http2")]
609            h2_builder: Default::default(),
610            #[cfg(feature = "http1")]
611            version: Proto::Http1,
612            #[cfg(not(feature = "http1"))]
613            version: Proto::Http2,
614        }
615    }
616
617    /// Provide an executor to execute background HTTP2 tasks.
618    pub fn executor<E>(&mut self, exec: E) -> &mut Builder
619    where
620        E: Executor<BoxSendFuture> + Send + Sync + 'static,
621    {
622        self.exec = Exec::Executor(Arc::new(exec));
623        self
624    }
625
626    /// Set whether HTTP/0.9 responses should be tolerated.
627    ///
628    /// Default is false.
629    pub fn http09_responses(&mut self, enabled: bool) -> &mut Builder {
630        self.h09_responses = enabled;
631        self
632    }
633
634    /// Set whether HTTP/1 connections will accept spaces between header names
635    /// and the colon that follow them in responses.
636    ///
637    /// You probably don't need this, here is what [RFC 7230 Section 3.2.4.] has
638    /// to say about it:
639    ///
640    /// > No whitespace is allowed between the header field-name and colon. In
641    /// > the past, differences in the handling of such whitespace have led to
642    /// > security vulnerabilities in request routing and response handling. A
643    /// > server MUST reject any received request message that contains
644    /// > whitespace between a header field-name and colon with a response code
645    /// > of 400 (Bad Request). A proxy MUST remove any such whitespace from a
646    /// > response message before forwarding the message downstream.
647    ///
648    /// Note that this setting does not affect HTTP/2.
649    ///
650    /// Default is false.
651    ///
652    /// [RFC 7230 Section 3.2.4.]: https://tools.ietf.org/html/rfc7230#section-3.2.4
653    pub fn http1_allow_spaces_after_header_name_in_responses(
654        &mut self,
655        enabled: bool,
656    ) -> &mut Builder {
657        self.h1_parser_config
658            .allow_spaces_after_header_name_in_responses(enabled);
659        self
660    }
661
662    /// Set whether HTTP/1 connections will accept obsolete line folding for
663    /// header values.
664    ///
665    /// Newline codepoints (`\r` and `\n`) will be transformed to spaces when
666    /// parsing.
667    ///
668    /// You probably don't need this, here is what [RFC 7230 Section 3.2.4.] has
669    /// to say about it:
670    ///
671    /// > A server that receives an obs-fold in a request message that is not
672    /// > within a message/http container MUST either reject the message by
673    /// > sending a 400 (Bad Request), preferably with a representation
674    /// > explaining that obsolete line folding is unacceptable, or replace
675    /// > each received obs-fold with one or more SP octets prior to
676    /// > interpreting the field value or forwarding the message downstream.
677    ///
678    /// > A proxy or gateway that receives an obs-fold in a response message
679    /// > that is not within a message/http container MUST either discard the
680    /// > message and replace it with a 502 (Bad Gateway) response, preferably
681    /// > with a representation explaining that unacceptable line folding was
682    /// > received, or replace each received obs-fold with one or more SP
683    /// > octets prior to interpreting the field value or forwarding the
684    /// > message downstream.
685    ///
686    /// > A user agent that receives an obs-fold in a response message that is
687    /// > not within a message/http container MUST replace each received
688    /// > obs-fold with one or more SP octets prior to interpreting the field
689    /// > value.
690    ///
691    /// Note that this setting does not affect HTTP/2.
692    ///
693    /// Default is false.
694    ///
695    /// [RFC 7230 Section 3.2.4.]: https://tools.ietf.org/html/rfc7230#section-3.2.4
696    pub fn http1_allow_obsolete_multiline_headers_in_responses(
697        &mut self,
698        enabled: bool,
699    ) -> &mut Builder {
700        self.h1_parser_config
701            .allow_obsolete_multiline_headers_in_responses(enabled);
702        self
703    }
704
705    /// Set whether HTTP/1 connections will silently ignored malformed header lines.
706    ///
707    /// If this is enabled and and a header line does not start with a valid header
708    /// name, or does not include a colon at all, the line will be silently ignored
709    /// and no error will be reported.
710    ///
711    /// Note that this setting does not affect HTTP/2.
712    ///
713    /// Default is false.
714    pub fn http1_ignore_invalid_headers_in_responses(&mut self, enabled: bool) -> &mut Builder {
715        self.h1_parser_config
716            .ignore_invalid_headers_in_responses(enabled);
717        self
718    }
719
720    /// Set whether HTTP/1 connections should try to use vectored writes,
721    /// or always flatten into a single buffer.
722    ///
723    /// Note that setting this to false may mean more copies of body data,
724    /// but may also improve performance when an IO transport doesn't
725    /// support vectored writes well, such as most TLS implementations.
726    ///
727    /// Setting this to true will force hyper to use queued strategy
728    /// which may eliminate unnecessary cloning on some TLS backends
729    ///
730    /// Default is `auto`. In this mode hyper will try to guess which
731    /// mode to use
732    pub fn http1_writev(&mut self, enabled: bool) -> &mut Builder {
733        self.h1_writev = Some(enabled);
734        self
735    }
736
737    /// Set whether HTTP/1 connections will write header names as title case at
738    /// the socket level.
739    ///
740    /// Note that this setting does not affect HTTP/2.
741    ///
742    /// Default is false.
743    pub fn http1_title_case_headers(&mut self, enabled: bool) -> &mut Builder {
744        self.h1_title_case_headers = enabled;
745        self
746    }
747
748    /// Set whether to support preserving original header cases.
749    ///
750    /// Currently, this will record the original cases received, and store them
751    /// in a private extension on the `Response`. It will also look for and use
752    /// such an extension in any provided `Request`.
753    ///
754    /// Since the relevant extension is still private, there is no way to
755    /// interact with the original cases. The only effect this can have now is
756    /// to forward the cases in a proxy-like fashion.
757    ///
758    /// Note that this setting does not affect HTTP/2.
759    ///
760    /// Default is false.
761    pub fn http1_preserve_header_case(&mut self, enabled: bool) -> &mut Builder {
762        self.h1_preserve_header_case = enabled;
763        self
764    }
765
766    /// Set whether to support preserving original header order.
767    ///
768    /// Currently, this will record the order in which headers are received, and store this
769    /// ordering in a private extension on the `Response`. It will also look for and use
770    /// such an extension in any provided `Request`.
771    ///
772    /// Note that this setting does not affect HTTP/2.
773    ///
774    /// Default is false.
775    #[cfg(feature = "ffi")]
776    pub fn http1_preserve_header_order(&mut self, enabled: bool) -> &mut Builder {
777        self.h1_preserve_header_order = enabled;
778        self
779    }
780
781    /// Sets the exact size of the read buffer to *always* use.
782    ///
783    /// Note that setting this option unsets the `http1_max_buf_size` option.
784    ///
785    /// Default is an adaptive read buffer.
786    pub fn http1_read_buf_exact_size(&mut self, sz: Option<usize>) -> &mut Builder {
787        self.h1_read_buf_exact_size = sz;
788        self.h1_max_buf_size = None;
789        self
790    }
791
792    /// Set the maximum buffer size for the connection.
793    ///
794    /// Default is ~400kb.
795    ///
796    /// Note that setting this option unsets the `http1_read_exact_buf_size` option.
797    ///
798    /// # Panics
799    ///
800    /// The minimum value allowed is 8192. This method panics if the passed `max` is less than the minimum.
801    #[cfg(feature = "http1")]
802    #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
803    pub fn http1_max_buf_size(&mut self, max: usize) -> &mut Self {
804        assert!(
805            max >= proto::h1::MINIMUM_MAX_BUFFER_SIZE,
806            "the max_buf_size cannot be smaller than the minimum that h1 specifies."
807        );
808
809        self.h1_max_buf_size = Some(max);
810        self.h1_read_buf_exact_size = None;
811        self
812    }
813
814    #[cfg(feature = "ffi")]
815    pub(crate) fn http1_headers_raw(&mut self, enabled: bool) -> &mut Self {
816        self.h1_headers_raw = enabled;
817        self
818    }
819
820    /// Sets whether HTTP2 is required.
821    ///
822    /// Default is false.
823    #[cfg(feature = "http2")]
824    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
825    pub fn http2_only(&mut self, enabled: bool) -> &mut Builder {
826        if enabled {
827            self.version = Proto::Http2
828        }
829        self
830    }
831
832    /// Sets the [`SETTINGS_INITIAL_WINDOW_SIZE`][spec] option for HTTP2
833    /// stream-level flow control.
834    ///
835    /// Passing `None` will do nothing.
836    ///
837    /// If not set, hyper will use a default.
838    ///
839    /// [spec]: https://http2.github.io/http2-spec/#SETTINGS_INITIAL_WINDOW_SIZE
840    #[cfg(feature = "http2")]
841    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
842    pub fn http2_initial_stream_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
843        if let Some(sz) = sz.into() {
844            self.h2_builder.adaptive_window = false;
845            self.h2_builder.initial_stream_window_size = sz;
846        }
847        self
848    }
849
850    /// Sets the max connection-level flow control for HTTP2
851    ///
852    /// Passing `None` will do nothing.
853    ///
854    /// If not set, hyper will use a default.
855    #[cfg(feature = "http2")]
856    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
857    pub fn http2_initial_connection_window_size(
858        &mut self,
859        sz: impl Into<Option<u32>>,
860    ) -> &mut Self {
861        if let Some(sz) = sz.into() {
862            self.h2_builder.adaptive_window = false;
863            self.h2_builder.initial_conn_window_size = sz;
864        }
865        self
866    }
867
868    /// Sets whether to use an adaptive flow control.
869    ///
870    /// Enabling this will override the limits set in
871    /// `http2_initial_stream_window_size` and
872    /// `http2_initial_connection_window_size`.
873    #[cfg(feature = "http2")]
874    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
875    pub fn http2_adaptive_window(&mut self, enabled: bool) -> &mut Self {
876        use proto::h2::SPEC_WINDOW_SIZE;
877
878        self.h2_builder.adaptive_window = enabled;
879        if enabled {
880            self.h2_builder.initial_conn_window_size = SPEC_WINDOW_SIZE;
881            self.h2_builder.initial_stream_window_size = SPEC_WINDOW_SIZE;
882        }
883        self
884    }
885
886    /// Sets the maximum frame size to use for HTTP2.
887    ///
888    /// Passing `None` will do nothing.
889    ///
890    /// If not set, hyper will use a default.
891    #[cfg(feature = "http2")]
892    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
893    pub fn http2_max_frame_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
894        if let Some(sz) = sz.into() {
895            self.h2_builder.max_frame_size = sz;
896        }
897        self
898    }
899
900    /// Sets an interval for HTTP2 Ping frames should be sent to keep a
901    /// connection alive.
902    ///
903    /// Pass `None` to disable HTTP2 keep-alive.
904    ///
905    /// Default is currently disabled.
906    ///
907    /// # Cargo Feature
908    ///
909    /// Requires the `runtime` cargo feature to be enabled.
910    #[cfg(feature = "runtime")]
911    #[cfg(feature = "http2")]
912    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
913    pub fn http2_keep_alive_interval(
914        &mut self,
915        interval: impl Into<Option<Duration>>,
916    ) -> &mut Self {
917        self.h2_builder.keep_alive_interval = interval.into();
918        self
919    }
920
921    /// Sets a timeout for receiving an acknowledgement of the keep-alive ping.
922    ///
923    /// If the ping is not acknowledged within the timeout, the connection will
924    /// be closed. Does nothing if `http2_keep_alive_interval` is disabled.
925    ///
926    /// Default is 20 seconds.
927    ///
928    /// # Cargo Feature
929    ///
930    /// Requires the `runtime` cargo feature to be enabled.
931    #[cfg(feature = "runtime")]
932    #[cfg(feature = "http2")]
933    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
934    pub fn http2_keep_alive_timeout(&mut self, timeout: Duration) -> &mut Self {
935        self.h2_builder.keep_alive_timeout = timeout;
936        self
937    }
938
939    /// Sets whether HTTP2 keep-alive should apply while the connection is idle.
940    ///
941    /// If disabled, keep-alive pings are only sent while there are open
942    /// request/responses streams. If enabled, pings are also sent when no
943    /// streams are active. Does nothing if `http2_keep_alive_interval` is
944    /// disabled.
945    ///
946    /// Default is `false`.
947    ///
948    /// # Cargo Feature
949    ///
950    /// Requires the `runtime` cargo feature to be enabled.
951    #[cfg(feature = "runtime")]
952    #[cfg(feature = "http2")]
953    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
954    pub fn http2_keep_alive_while_idle(&mut self, enabled: bool) -> &mut Self {
955        self.h2_builder.keep_alive_while_idle = enabled;
956        self
957    }
958
959    /// Sets the maximum number of HTTP2 concurrent locally reset streams.
960    ///
961    /// See the documentation of [`h2::client::Builder::max_concurrent_reset_streams`] for more
962    /// details.
963    ///
964    /// The default value is determined by the `h2` crate.
965    ///
966    /// [`h2::client::Builder::max_concurrent_reset_streams`]: https://docs.rs/h2/client/struct.Builder.html#method.max_concurrent_reset_streams
967    #[cfg(feature = "http2")]
968    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
969    pub fn http2_max_concurrent_reset_streams(&mut self, max: usize) -> &mut Self {
970        self.h2_builder.max_concurrent_reset_streams = Some(max);
971        self
972    }
973
974    /// Set the maximum write buffer size for each HTTP/2 stream.
975    ///
976    /// Default is currently 1MB, but may change.
977    ///
978    /// # Panics
979    ///
980    /// The value must be no larger than `u32::MAX`.
981    #[cfg(feature = "http2")]
982    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
983    pub fn http2_max_send_buf_size(&mut self, max: usize) -> &mut Self {
984        assert!(max <= std::u32::MAX as usize);
985        self.h2_builder.max_send_buffer_size = max;
986        self
987    }
988
989    /// Constructs a connection with the configured options and IO.
990    /// See [`client::conn`](crate::client::conn) for more.
991    ///
992    /// Note, if [`Connection`] is not `await`-ed, [`SendRequest`] will
993    /// do nothing.
994    pub fn handshake<T, B>(
995        &self,
996        io: T,
997    ) -> impl Future<Output = crate::Result<(SendRequest<B>, Connection<T, B>)>>
998    where
999        T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
1000        B: HttpBody + 'static,
1001        B::Data: Send,
1002        B::Error: Into<Box<dyn StdError + Send + Sync>>,
1003    {
1004        let opts = self.clone();
1005
1006        async move {
1007            trace!("client handshake {:?}", opts.version);
1008
1009            let (tx, rx) = dispatch::channel();
1010            let proto = match opts.version {
1011                #[cfg(feature = "http1")]
1012                Proto::Http1 => {
1013                    let mut conn = proto::Conn::new(io);
1014                    conn.set_h1_parser_config(opts.h1_parser_config);
1015                    if let Some(writev) = opts.h1_writev {
1016                        if writev {
1017                            conn.set_write_strategy_queue();
1018                        } else {
1019                            conn.set_write_strategy_flatten();
1020                        }
1021                    }
1022                    if opts.h1_title_case_headers {
1023                        conn.set_title_case_headers();
1024                    }
1025                    if opts.h1_preserve_header_case {
1026                        conn.set_preserve_header_case();
1027                    }
1028                    #[cfg(feature = "ffi")]
1029                    if opts.h1_preserve_header_order {
1030                        conn.set_preserve_header_order();
1031                    }
1032                    if opts.h09_responses {
1033                        conn.set_h09_responses();
1034                    }
1035
1036                    #[cfg(feature = "ffi")]
1037                    conn.set_raw_headers(opts.h1_headers_raw);
1038
1039                    if let Some(sz) = opts.h1_read_buf_exact_size {
1040                        conn.set_read_buf_exact_size(sz);
1041                    }
1042                    if let Some(max) = opts.h1_max_buf_size {
1043                        conn.set_max_buf_size(max);
1044                    }
1045                    let cd = proto::h1::dispatch::Client::new(rx);
1046                    let dispatch = proto::h1::Dispatcher::new(cd, conn);
1047                    ProtoClient::H1 { h1: dispatch }
1048                }
1049                #[cfg(feature = "http2")]
1050                Proto::Http2 => {
1051                    let h2 =
1052                        proto::h2::client::handshake(io, rx, &opts.h2_builder, opts.exec.clone())
1053                            .await?;
1054                    ProtoClient::H2 { h2 }
1055                }
1056            };
1057
1058            Ok((
1059                SendRequest { dispatch: tx },
1060                Connection { inner: Some(proto) },
1061            ))
1062        }
1063    }
1064}
1065
1066// ===== impl ResponseFuture
1067
1068impl Future for ResponseFuture {
1069    type Output = crate::Result<Response<Body>>;
1070
1071    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1072        match self.inner {
1073            ResponseFutureState::Waiting(ref mut rx) => {
1074                Pin::new(rx).poll(cx).map(|res| match res {
1075                    Ok(Ok(resp)) => Ok(resp),
1076                    Ok(Err(err)) => Err(err),
1077                    // this is definite bug if it happens, but it shouldn't happen!
1078                    Err(_canceled) => panic!("dispatch dropped without returning error"),
1079                })
1080            }
1081            ResponseFutureState::Error(ref mut err) => {
1082                Poll::Ready(Err(err.take().expect("polled after ready")))
1083            }
1084        }
1085    }
1086}
1087
1088impl fmt::Debug for ResponseFuture {
1089    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1090        f.debug_struct("ResponseFuture").finish()
1091    }
1092}
1093
1094// ===== impl ProtoClient
1095
1096impl<T, B> Future for ProtoClient<T, B>
1097where
1098    T: AsyncRead + AsyncWrite + Send + Unpin + 'static,
1099    B: HttpBody + Send + 'static,
1100    B::Data: Send,
1101    B::Error: Into<Box<dyn StdError + Send + Sync>>,
1102{
1103    type Output = crate::Result<proto::Dispatched>;
1104
1105    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1106        match self.project() {
1107            #[cfg(feature = "http1")]
1108            ProtoClientProj::H1 { h1 } => h1.poll(cx),
1109            #[cfg(feature = "http2")]
1110            ProtoClientProj::H2 { h2, .. } => h2.poll(cx),
1111
1112            #[cfg(not(feature = "http1"))]
1113            ProtoClientProj::H1 { h1 } => match h1.0 {},
1114            #[cfg(not(feature = "http2"))]
1115            ProtoClientProj::H2 { h2, .. } => match h2.0 {},
1116        }
1117    }
1118}
1119
1120// assert trait markers
1121
1122trait AssertSend: Send {}
1123trait AssertSendSync: Send + Sync {}
1124
1125#[cfg_attr(feature = "deprecated", allow(deprecated))]
1126#[doc(hidden)]
1127impl<B: Send> AssertSendSync for SendRequest<B> {}
1128
1129#[cfg_attr(feature = "deprecated", allow(deprecated))]
1130#[doc(hidden)]
1131impl<T: Send, B: Send> AssertSend for Connection<T, B>
1132where
1133    T: AsyncRead + AsyncWrite + Send + 'static,
1134    B: HttpBody + 'static,
1135    B::Data: Send,
1136{
1137}
1138
1139#[cfg_attr(feature = "deprecated", allow(deprecated))]
1140#[doc(hidden)]
1141impl<T: Send + Sync, B: Send + Sync> AssertSendSync for Connection<T, B>
1142where
1143    T: AsyncRead + AsyncWrite + Send + 'static,
1144    B: HttpBody + 'static,
1145    B::Data: Send + Sync + 'static,
1146{
1147}
1148
1149#[cfg_attr(feature = "deprecated", allow(deprecated))]
1150#[doc(hidden)]
1151impl AssertSendSync for Builder {}
1152
1153#[doc(hidden)]
1154impl AssertSend for ResponseFuture {}