hyper/client/connect/
mod.rs

1//! Connectors used by the `Client`.
2//!
3//! This module contains:
4//!
5//! - A default [`HttpConnector`][] that does DNS resolution and establishes
6//!   connections over TCP.
7//! - Types to build custom connectors.
8//!
9//! # Connectors
10//!
11//! A "connector" is a [`Service`][] that takes a [`Uri`][] destination, and
12//! its `Response` is some type implementing [`AsyncRead`][], [`AsyncWrite`][],
13//! and [`Connection`][].
14//!
15//! ## Custom Connectors
16//!
17//! A simple connector that ignores the `Uri` destination and always returns
18//! a TCP connection to the same address could be written like this:
19//!
20//! ```rust,ignore
21//! let connector = tower::service_fn(|_dst| async {
22//!     tokio::net::TcpStream::connect("127.0.0.1:1337")
23//! })
24//! ```
25//!
26//! Or, fully written out:
27//!
28//! ```
29//! # #[cfg(feature = "runtime")]
30//! # mod rt {
31//! use std::{future::Future, net::SocketAddr, pin::Pin, task::{self, Poll}};
32//! use hyper::{service::Service, Uri};
33//! use tokio::net::TcpStream;
34//!
35//! #[derive(Clone)]
36//! struct LocalConnector;
37//!
38//! impl Service<Uri> for LocalConnector {
39//!     type Response = TcpStream;
40//!     type Error = std::io::Error;
41//!     // We can't "name" an `async` generated future.
42//!     type Future = Pin<Box<
43//!         dyn Future<Output = Result<Self::Response, Self::Error>> + Send
44//!     >>;
45//!
46//!     fn poll_ready(&mut self, _: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
47//!         // This connector is always ready, but others might not be.
48//!         Poll::Ready(Ok(()))
49//!     }
50//!
51//!     fn call(&mut self, _: Uri) -> Self::Future {
52//!         Box::pin(TcpStream::connect(SocketAddr::from(([127, 0, 0, 1], 1337))))
53//!     }
54//! }
55//! # }
56//! ```
57//!
58//! It's worth noting that for `TcpStream`s, the [`HttpConnector`][] is a
59//! better starting place to extend from.
60//!
61//! Using either of the above connector examples, it can be used with the
62//! `Client` like this:
63//!
64//! ```
65//! # #[cfg(feature = "runtime")]
66//! # fn rt () {
67//! # let connector = hyper::client::HttpConnector::new();
68//! // let connector = ...
69//!
70//! let client = hyper::Client::builder()
71//!     .build::<_, hyper::Body>(connector);
72//! # }
73//! ```
74//!
75//!
76//! [`HttpConnector`]: HttpConnector
77//! [`Service`]: crate::service::Service
78//! [`Uri`]: ::http::Uri
79//! [`AsyncRead`]: tokio::io::AsyncRead
80//! [`AsyncWrite`]: tokio::io::AsyncWrite
81//! [`Connection`]: Connection
82use std::fmt;
83use std::fmt::{Debug, Formatter};
84use std::ops::Deref;
85use std::sync::atomic::{AtomicBool, Ordering};
86use std::sync::Arc;
87
88use ::http::Extensions;
89use tokio::sync::watch;
90
91cfg_feature! {
92    #![feature = "tcp"]
93
94    pub use self::http::{HttpConnector, HttpInfo};
95
96    pub mod dns;
97    mod http;
98}
99
100cfg_feature! {
101    #![any(feature = "http1", feature = "http2")]
102
103    pub use self::sealed::Connect;
104}
105
106/// Describes a type returned by a connector.
107pub trait Connection {
108    /// Return metadata describing the connection.
109    fn connected(&self) -> Connected;
110}
111
112/// Extra information about the connected transport.
113///
114/// This can be used to inform recipients about things like if ALPN
115/// was used, or if connected to an HTTP proxy.
116#[derive(Debug)]
117pub struct Connected {
118    pub(super) alpn: Alpn,
119    pub(super) is_proxied: bool,
120    pub(super) extra: Option<Extra>,
121    pub(super) poisoned: PoisonPill,
122}
123
124#[derive(Clone)]
125pub(crate) struct PoisonPill {
126    poisoned: Arc<AtomicBool>,
127}
128
129impl Debug for PoisonPill {
130    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
131        // print the address of the pill—this makes debugging issues much easier
132        write!(
133            f,
134            "PoisonPill@{:p} {{ poisoned: {} }}",
135            self.poisoned,
136            self.poisoned.load(Ordering::Relaxed)
137        )
138    }
139}
140
141impl PoisonPill {
142    pub(crate) fn healthy() -> Self {
143        Self {
144            poisoned: Arc::new(AtomicBool::new(false)),
145        }
146    }
147    pub(crate) fn poison(&self) {
148        self.poisoned.store(true, Ordering::Relaxed)
149    }
150
151    pub(crate) fn poisoned(&self) -> bool {
152        self.poisoned.load(Ordering::Relaxed)
153    }
154}
155
156/// [`CaptureConnection`] allows callers to capture [`Connected`] information
157///
158/// To capture a connection for a request, use [`capture_connection`].
159#[derive(Debug, Clone)]
160pub struct CaptureConnection {
161    rx: watch::Receiver<Option<Connected>>,
162}
163
164/// Capture the connection for a given request
165///
166/// When making a request with Hyper, the underlying connection must implement the [`Connection`] trait.
167/// [`capture_connection`] allows a caller to capture the returned [`Connected`] structure as soon
168/// as the connection is established.
169///
170/// *Note*: If establishing a connection fails, [`CaptureConnection::connection_metadata`] will always return none.
171///
172/// # Examples
173///
174/// **Synchronous access**:
175/// The [`CaptureConnection::connection_metadata`] method allows callers to check if a connection has been
176/// established. This is ideal for situations where you are certain the connection has already
177/// been established (e.g. after the response future has already completed).
178/// ```rust
179/// use hyper::client::connect::{capture_connection, CaptureConnection};
180/// let mut request = http::Request::builder()
181///   .uri("http://foo.com")
182///   .body(())
183///   .unwrap();
184///
185/// let captured_connection = capture_connection(&mut request);
186/// // some time later after the request has been sent...
187/// let connection_info = captured_connection.connection_metadata();
188/// println!("we are connected! {:?}", connection_info.as_ref());
189/// ```
190///
191/// **Asynchronous access**:
192/// The [`CaptureConnection::wait_for_connection_metadata`] method returns a future resolves as soon as the
193/// connection is available.
194///
195/// ```rust
196/// # #[cfg(feature  = "runtime")]
197/// # async fn example() {
198/// use hyper::client::connect::{capture_connection, CaptureConnection};
199/// let mut request = http::Request::builder()
200///   .uri("http://foo.com")
201///   .body(hyper::Body::empty())
202///   .unwrap();
203///
204/// let mut captured = capture_connection(&mut request);
205/// tokio::task::spawn(async move {
206///     let connection_info = captured.wait_for_connection_metadata().await;
207///     println!("we are connected! {:?}", connection_info.as_ref());
208/// });
209///
210/// let client = hyper::Client::new();
211/// client.request(request).await.expect("request failed");
212/// # }
213/// ```
214pub fn capture_connection<B>(request: &mut crate::http::Request<B>) -> CaptureConnection {
215    let (tx, rx) = CaptureConnection::new();
216    request.extensions_mut().insert(tx);
217    rx
218}
219
220/// TxSide for [`CaptureConnection`]
221///
222/// This is inserted into `Extensions` to allow Hyper to back channel connection info
223#[derive(Clone)]
224pub(crate) struct CaptureConnectionExtension {
225    tx: Arc<watch::Sender<Option<Connected>>>,
226}
227
228impl CaptureConnectionExtension {
229    pub(crate) fn set(&self, connected: &Connected) {
230        self.tx.send_replace(Some(connected.clone()));
231    }
232}
233
234impl CaptureConnection {
235    /// Internal API to create the tx and rx half of [`CaptureConnection`]
236    pub(crate) fn new() -> (CaptureConnectionExtension, Self) {
237        let (tx, rx) = watch::channel(None);
238        (
239            CaptureConnectionExtension { tx: Arc::new(tx) },
240            CaptureConnection { rx },
241        )
242    }
243
244    /// Retrieve the connection metadata, if available
245    pub fn connection_metadata(&self) -> impl Deref<Target = Option<Connected>> + '_ {
246        self.rx.borrow()
247    }
248
249    /// Wait for the connection to be established
250    ///
251    /// If a connection was established, this will always return `Some(...)`. If the request never
252    /// successfully connected (e.g. DNS resolution failure), this method will never return.
253    pub async fn wait_for_connection_metadata(
254        &mut self,
255    ) -> impl Deref<Target = Option<Connected>> + '_ {
256        if self.rx.borrow().is_some() {
257            return self.rx.borrow();
258        }
259        let _ = self.rx.changed().await;
260        self.rx.borrow()
261    }
262}
263
264pub(super) struct Extra(Box<dyn ExtraInner>);
265
266#[derive(Clone, Copy, Debug, PartialEq)]
267pub(super) enum Alpn {
268    H2,
269    None,
270}
271
272impl Connected {
273    /// Create new `Connected` type with empty metadata.
274    pub fn new() -> Connected {
275        Connected {
276            alpn: Alpn::None,
277            is_proxied: false,
278            extra: None,
279            poisoned: PoisonPill::healthy(),
280        }
281    }
282
283    /// Set whether the connected transport is to an HTTP proxy.
284    ///
285    /// This setting will affect if HTTP/1 requests written on the transport
286    /// will have the request-target in absolute-form or origin-form:
287    ///
288    /// - When `proxy(false)`:
289    ///
290    /// ```http
291    /// GET /guide HTTP/1.1
292    /// ```
293    ///
294    /// - When `proxy(true)`:
295    ///
296    /// ```http
297    /// GET http://hyper.rs/guide HTTP/1.1
298    /// ```
299    ///
300    /// Default is `false`.
301    pub fn proxy(mut self, is_proxied: bool) -> Connected {
302        self.is_proxied = is_proxied;
303        self
304    }
305
306    /// Determines if the connected transport is to an HTTP proxy.
307    pub fn is_proxied(&self) -> bool {
308        self.is_proxied
309    }
310
311    /// Set extra connection information to be set in the extensions of every `Response`.
312    pub fn extra<T: Clone + Send + Sync + 'static>(mut self, extra: T) -> Connected {
313        if let Some(prev) = self.extra {
314            self.extra = Some(Extra(Box::new(ExtraChain(prev.0, extra))));
315        } else {
316            self.extra = Some(Extra(Box::new(ExtraEnvelope(extra))));
317        }
318        self
319    }
320
321    /// Copies the extra connection information into an `Extensions` map.
322    pub fn get_extras(&self, extensions: &mut Extensions) {
323        if let Some(extra) = &self.extra {
324            extra.set(extensions);
325        }
326    }
327
328    /// Set that the connected transport negotiated HTTP/2 as its next protocol.
329    pub fn negotiated_h2(mut self) -> Connected {
330        self.alpn = Alpn::H2;
331        self
332    }
333
334    /// Determines if the connected transport negotiated HTTP/2 as its next protocol.
335    pub fn is_negotiated_h2(&self) -> bool {
336        self.alpn == Alpn::H2
337    }
338
339    /// Poison this connection
340    ///
341    /// A poisoned connection will not be reused for subsequent requests by the pool
342    pub fn poison(&self) {
343        self.poisoned.poison();
344        tracing::debug!(
345            poison_pill = ?self.poisoned, "connection was poisoned"
346        );
347    }
348
349    // Don't public expose that `Connected` is `Clone`, unsure if we want to
350    // keep that contract...
351    pub(super) fn clone(&self) -> Connected {
352        Connected {
353            alpn: self.alpn.clone(),
354            is_proxied: self.is_proxied,
355            extra: self.extra.clone(),
356            poisoned: self.poisoned.clone(),
357        }
358    }
359}
360
361// ===== impl Extra =====
362
363impl Extra {
364    pub(super) fn set(&self, res: &mut Extensions) {
365        self.0.set(res);
366    }
367}
368
369impl Clone for Extra {
370    fn clone(&self) -> Extra {
371        Extra(self.0.clone_box())
372    }
373}
374
375impl fmt::Debug for Extra {
376    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
377        f.debug_struct("Extra").finish()
378    }
379}
380
381trait ExtraInner: Send + Sync {
382    fn clone_box(&self) -> Box<dyn ExtraInner>;
383    fn set(&self, res: &mut Extensions);
384}
385
386// This indirection allows the `Connected` to have a type-erased "extra" value,
387// while that type still knows its inner extra type. This allows the correct
388// TypeId to be used when inserting into `res.extensions_mut()`.
389#[derive(Clone)]
390struct ExtraEnvelope<T>(T);
391
392impl<T> ExtraInner for ExtraEnvelope<T>
393where
394    T: Clone + Send + Sync + 'static,
395{
396    fn clone_box(&self) -> Box<dyn ExtraInner> {
397        Box::new(self.clone())
398    }
399
400    fn set(&self, res: &mut Extensions) {
401        res.insert(self.0.clone());
402    }
403}
404
405struct ExtraChain<T>(Box<dyn ExtraInner>, T);
406
407impl<T: Clone> Clone for ExtraChain<T> {
408    fn clone(&self) -> Self {
409        ExtraChain(self.0.clone_box(), self.1.clone())
410    }
411}
412
413impl<T> ExtraInner for ExtraChain<T>
414where
415    T: Clone + Send + Sync + 'static,
416{
417    fn clone_box(&self) -> Box<dyn ExtraInner> {
418        Box::new(self.clone())
419    }
420
421    fn set(&self, res: &mut Extensions) {
422        self.0.set(res);
423        res.insert(self.1.clone());
424    }
425}
426
427#[cfg(any(feature = "http1", feature = "http2"))]
428pub(super) mod sealed {
429    use std::error::Error as StdError;
430    use std::future::Future;
431    use std::marker::Unpin;
432
433    use ::http::Uri;
434    use tokio::io::{AsyncRead, AsyncWrite};
435
436    use super::Connection;
437
438    /// Connect to a destination, returning an IO transport.
439    ///
440    /// A connector receives a [`Uri`](::http::Uri) and returns a `Future` of the
441    /// ready connection.
442    ///
443    /// # Trait Alias
444    ///
445    /// This is really just an *alias* for the `tower::Service` trait, with
446    /// additional bounds set for convenience *inside* hyper. You don't actually
447    /// implement this trait, but `tower::Service<Uri>` instead.
448    // The `Sized` bound is to prevent creating `dyn Connect`, since they cannot
449    // fit the `Connect` bounds because of the blanket impl for `Service`.
450    pub trait Connect: Sealed + Sized {
451        #[doc(hidden)]
452        type _Svc: ConnectSvc;
453        #[doc(hidden)]
454        fn connect(self, internal_only: Internal, dst: Uri) -> <Self::_Svc as ConnectSvc>::Future;
455    }
456
457    #[allow(unreachable_pub)]
458    pub trait ConnectSvc {
459        type Connection: AsyncRead + AsyncWrite + Connection + Unpin + Send + 'static;
460        type Error: Into<Box<dyn StdError + Send + Sync>>;
461        type Future: Future<Output = Result<Self::Connection, Self::Error>> + Unpin + Send + 'static;
462
463        fn connect(self, internal_only: Internal, dst: Uri) -> Self::Future;
464    }
465
466    impl<S, T> Connect for S
467    where
468        S: tower_service::Service<Uri, Response = T> + Send + 'static,
469        S::Error: Into<Box<dyn StdError + Send + Sync>>,
470        S::Future: Unpin + Send,
471        T: AsyncRead + AsyncWrite + Connection + Unpin + Send + 'static,
472    {
473        type _Svc = S;
474
475        fn connect(self, _: Internal, dst: Uri) -> crate::service::Oneshot<S, Uri> {
476            crate::service::oneshot(self, dst)
477        }
478    }
479
480    impl<S, T> ConnectSvc for S
481    where
482        S: tower_service::Service<Uri, Response = T> + Send + 'static,
483        S::Error: Into<Box<dyn StdError + Send + Sync>>,
484        S::Future: Unpin + Send,
485        T: AsyncRead + AsyncWrite + Connection + Unpin + Send + 'static,
486    {
487        type Connection = T;
488        type Error = S::Error;
489        type Future = crate::service::Oneshot<S, Uri>;
490
491        fn connect(self, _: Internal, dst: Uri) -> Self::Future {
492            crate::service::oneshot(self, dst)
493        }
494    }
495
496    impl<S, T> Sealed for S
497    where
498        S: tower_service::Service<Uri, Response = T> + Send,
499        S::Error: Into<Box<dyn StdError + Send + Sync>>,
500        S::Future: Unpin + Send,
501        T: AsyncRead + AsyncWrite + Connection + Unpin + Send + 'static,
502    {
503    }
504
505    pub trait Sealed {}
506    #[allow(missing_debug_implementations)]
507    pub struct Internal;
508}
509
510#[cfg(test)]
511mod tests {
512    use super::Connected;
513    use crate::client::connect::CaptureConnection;
514
515    #[derive(Clone, Debug, PartialEq)]
516    struct Ex1(usize);
517
518    #[derive(Clone, Debug, PartialEq)]
519    struct Ex2(&'static str);
520
521    #[derive(Clone, Debug, PartialEq)]
522    struct Ex3(&'static str);
523
524    #[test]
525    fn test_connected_extra() {
526        let c1 = Connected::new().extra(Ex1(41));
527
528        let mut ex = ::http::Extensions::new();
529
530        assert_eq!(ex.get::<Ex1>(), None);
531
532        c1.extra.as_ref().expect("c1 extra").set(&mut ex);
533
534        assert_eq!(ex.get::<Ex1>(), Some(&Ex1(41)));
535    }
536
537    #[test]
538    fn test_connected_extra_chain() {
539        // If a user composes connectors and at each stage, there's "extra"
540        // info to attach, it shouldn't override the previous extras.
541
542        let c1 = Connected::new()
543            .extra(Ex1(45))
544            .extra(Ex2("zoom"))
545            .extra(Ex3("pew pew"));
546
547        let mut ex1 = ::http::Extensions::new();
548
549        assert_eq!(ex1.get::<Ex1>(), None);
550        assert_eq!(ex1.get::<Ex2>(), None);
551        assert_eq!(ex1.get::<Ex3>(), None);
552
553        c1.extra.as_ref().expect("c1 extra").set(&mut ex1);
554
555        assert_eq!(ex1.get::<Ex1>(), Some(&Ex1(45)));
556        assert_eq!(ex1.get::<Ex2>(), Some(&Ex2("zoom")));
557        assert_eq!(ex1.get::<Ex3>(), Some(&Ex3("pew pew")));
558
559        // Just like extensions, inserting the same type overrides previous type.
560        let c2 = Connected::new()
561            .extra(Ex1(33))
562            .extra(Ex2("hiccup"))
563            .extra(Ex1(99));
564
565        let mut ex2 = ::http::Extensions::new();
566
567        c2.extra.as_ref().expect("c2 extra").set(&mut ex2);
568
569        assert_eq!(ex2.get::<Ex1>(), Some(&Ex1(99)));
570        assert_eq!(ex2.get::<Ex2>(), Some(&Ex2("hiccup")));
571    }
572
573    #[test]
574    fn test_sync_capture_connection() {
575        let (tx, rx) = CaptureConnection::new();
576        assert!(
577            rx.connection_metadata().is_none(),
578            "connection has not been set"
579        );
580        tx.set(&Connected::new().proxy(true));
581        assert_eq!(
582            rx.connection_metadata()
583                .as_ref()
584                .expect("connected should be set")
585                .is_proxied(),
586            true
587        );
588
589        // ensure it can be called multiple times
590        assert_eq!(
591            rx.connection_metadata()
592                .as_ref()
593                .expect("connected should be set")
594                .is_proxied(),
595            true
596        );
597    }
598
599    #[tokio::test]
600    async fn async_capture_connection() {
601        let (tx, mut rx) = CaptureConnection::new();
602        assert!(
603            rx.connection_metadata().is_none(),
604            "connection has not been set"
605        );
606        let test_task = tokio::spawn(async move {
607            assert_eq!(
608                rx.wait_for_connection_metadata()
609                    .await
610                    .as_ref()
611                    .expect("connection should be set")
612                    .is_proxied(),
613                true
614            );
615            // can be awaited multiple times
616            assert!(
617                rx.wait_for_connection_metadata().await.is_some(),
618                "should be awaitable multiple times"
619            );
620
621            assert_eq!(rx.connection_metadata().is_some(), true);
622        });
623        // can't be finished, we haven't set the connection yet
624        assert_eq!(test_task.is_finished(), false);
625        tx.set(&Connected::new().proxy(true));
626
627        assert!(test_task.await.is_ok());
628    }
629
630    #[tokio::test]
631    async fn capture_connection_sender_side_dropped() {
632        let (tx, mut rx) = CaptureConnection::new();
633        assert!(
634            rx.connection_metadata().is_none(),
635            "connection has not been set"
636        );
637        drop(tx);
638        assert!(rx.wait_for_connection_metadata().await.is_none());
639    }
640}