1#[cfg(all(feature = "backports", feature = "http1"))]
58pub mod http1;
59#[cfg(all(feature = "backports", feature = "http2"))]
60pub mod http2;
61
62#[cfg(not(all(feature = "http1", feature = "http2")))]
63use std::convert::Infallible;
64use std::error::Error as StdError;
65use std::fmt;
66use std::future::Future;
67#[cfg(not(all(feature = "http1", feature = "http2")))]
68use std::marker::PhantomData;
69use std::marker::Unpin;
70use std::pin::Pin;
71use std::sync::Arc;
72use std::task::{Context, Poll};
73#[cfg(all(feature = "runtime", feature = "http2"))]
74use std::time::Duration;
75
76use bytes::Bytes;
77use futures_util::future::{self, Either, FutureExt as _};
78use httparse::ParserConfig;
79use pin_project_lite::pin_project;
80use tokio::io::{AsyncRead, AsyncWrite};
81use tower_service::Service;
82use tracing::{debug, trace};
83
84use super::dispatch;
85use crate::body::HttpBody;
86use crate::common::exec::{BoxSendFuture, Exec};
87use crate::proto;
88use crate::rt::Executor;
89#[cfg(feature = "http1")]
90use crate::upgrade::Upgraded;
91use crate::{Body, Request, Response};
92
93#[cfg(feature = "http1")]
94type Http1Dispatcher<T, B> =
95 proto::dispatch::Dispatcher<proto::dispatch::Client<B>, B, T, proto::h1::ClientTransaction>;
96
97#[cfg(not(feature = "http1"))]
98type Http1Dispatcher<T, B> = (Infallible, PhantomData<(T, Pin<Box<B>>)>);
99
100#[cfg(feature = "http2")]
101type Http2ClientTask<B> = proto::h2::ClientTask<B>;
102
103#[cfg(not(feature = "http2"))]
104type Http2ClientTask<B> = (Infallible, PhantomData<Pin<Box<B>>>);
105
106pin_project! {
107 #[project = ProtoClientProj]
108 enum ProtoClient<T, B>
109 where
110 B: HttpBody,
111 {
112 H1 {
113 #[pin]
114 h1: Http1Dispatcher<T, B>,
115 },
116 H2 {
117 #[pin]
118 h2: Http2ClientTask<B>,
119 },
120 }
121}
122
123#[cfg_attr(
128 feature = "deprecated",
129 deprecated(
130 note = "This function will be replaced with `client::conn::http1::handshake` and `client::conn::http2::handshake` in 1.0, enable the \"backports\" feature to use them now."
131 )
132)]
133#[cfg_attr(feature = "deprecated", allow(deprecated))]
134pub async fn handshake<T>(
135 io: T,
136) -> crate::Result<(SendRequest<crate::Body>, Connection<T, crate::Body>)>
137where
138 T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
139{
140 #[allow(deprecated)]
141 Builder::new().handshake(io).await
142}
143
144#[cfg_attr(
146 feature = "deprecated",
147 deprecated(
148 note = "This type will be replaced with `client::conn::http1::SendRequest` and `client::conn::http2::SendRequest` in 1.0, enable the \"backports\" feature to use them now."
149 )
150)]
151pub struct SendRequest<B> {
152 dispatch: dispatch::Sender<Request<B>, Response<Body>>,
153}
154
155#[must_use = "futures do nothing unless polled"]
160#[cfg_attr(
161 feature = "deprecated",
162 deprecated(
163 note = "This type will be replaced with `client::conn::http1::Connection` and `client::conn::http2::Connection` in 1.0, enable the \"backports\" feature to use them now."
164 )
165)]
166pub struct Connection<T, B>
167where
168 T: AsyncRead + AsyncWrite + Send + 'static,
169 B: HttpBody + 'static,
170{
171 inner: Option<ProtoClient<T, B>>,
172}
173
174#[derive(Clone, Debug)]
178#[cfg_attr(
179 feature = "deprecated",
180 deprecated(
181 note = "This type will be replaced with `client::conn::http1::Builder` and `client::conn::http2::Builder` in 1.0, enable the \"backports\" feature to use them now."
182 )
183)]
184pub struct Builder {
185 pub(super) exec: Exec,
186 h09_responses: bool,
187 h1_parser_config: ParserConfig,
188 h1_writev: Option<bool>,
189 h1_title_case_headers: bool,
190 h1_preserve_header_case: bool,
191 #[cfg(feature = "ffi")]
192 h1_preserve_header_order: bool,
193 h1_read_buf_exact_size: Option<usize>,
194 h1_max_buf_size: Option<usize>,
195 #[cfg(feature = "ffi")]
196 h1_headers_raw: bool,
197 #[cfg(feature = "http2")]
198 h2_builder: proto::h2::client::Config,
199 version: Proto,
200}
201
202#[derive(Clone, Debug)]
203enum Proto {
204 #[cfg(feature = "http1")]
205 Http1,
206 #[cfg(feature = "http2")]
207 Http2,
208}
209
210#[must_use = "futures do nothing unless polled"]
214pub struct ResponseFuture {
215 inner: ResponseFutureState,
216}
217
218enum ResponseFutureState {
219 Waiting(dispatch::Promise<Response<Body>>),
220 Error(Option<crate::Error>),
222}
223
224#[derive(Debug)]
229pub struct Parts<T> {
230 pub io: T,
232 pub read_buf: Bytes,
241 _inner: (),
242}
243
244#[must_use = "futures do nothing unless polled"]
249#[cfg(feature = "http2")]
250pub(super) struct Http2SendRequest<B> {
251 dispatch: dispatch::UnboundedSender<Request<B>, Response<Body>>,
252}
253
254#[cfg_attr(feature = "deprecated", allow(deprecated))]
257impl<B> SendRequest<B> {
258 pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
262 self.dispatch.poll_ready(cx)
263 }
264
265 pub(super) async fn when_ready(self) -> crate::Result<Self> {
266 let mut me = Some(self);
267 future::poll_fn(move |cx| {
268 ready!(me.as_mut().unwrap().poll_ready(cx))?;
269 Poll::Ready(Ok(me.take().unwrap()))
270 })
271 .await
272 }
273
274 pub(super) fn is_ready(&self) -> bool {
275 self.dispatch.is_ready()
276 }
277
278 pub(super) fn is_closed(&self) -> bool {
279 self.dispatch.is_closed()
280 }
281
282 #[cfg(feature = "http2")]
283 pub(super) fn into_http2(self) -> Http2SendRequest<B> {
284 Http2SendRequest {
285 dispatch: self.dispatch.unbound(),
286 }
287 }
288}
289
290#[cfg_attr(feature = "deprecated", allow(deprecated))]
291impl<B> SendRequest<B>
292where
293 B: HttpBody + 'static,
294{
295 pub fn send_request(&mut self, req: Request<B>) -> ResponseFuture {
337 let inner = match self.dispatch.send(req) {
338 Ok(rx) => ResponseFutureState::Waiting(rx),
339 Err(_req) => {
340 debug!("connection was not ready");
341 let err = crate::Error::new_canceled().with("connection was not ready");
342 ResponseFutureState::Error(Some(err))
343 }
344 };
345
346 ResponseFuture { inner }
347 }
348
349 pub(super) fn send_request_retryable(
350 &mut self,
351 req: Request<B>,
352 ) -> impl Future<Output = Result<Response<Body>, (crate::Error, Option<Request<B>>)>> + Unpin
353 where
354 B: Send,
355 {
356 match self.dispatch.try_send(req) {
357 Ok(rx) => {
358 Either::Left(rx.then(move |res| {
359 match res {
360 Ok(Ok(res)) => future::ok(res),
361 Ok(Err(err)) => future::err(err),
362 Err(_) => panic!("dispatch dropped without returning error"),
364 }
365 }))
366 }
367 Err(req) => {
368 debug!("connection was not ready");
369 let err = crate::Error::new_canceled().with("connection was not ready");
370 Either::Right(future::err((err, Some(req))))
371 }
372 }
373 }
374}
375
376#[cfg_attr(feature = "deprecated", allow(deprecated))]
377impl<B> Service<Request<B>> for SendRequest<B>
378where
379 B: HttpBody + 'static,
380{
381 type Response = Response<Body>;
382 type Error = crate::Error;
383 type Future = ResponseFuture;
384
385 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
386 self.poll_ready(cx)
387 }
388
389 fn call(&mut self, req: Request<B>) -> Self::Future {
390 self.send_request(req)
391 }
392}
393
394#[cfg_attr(feature = "deprecated", allow(deprecated))]
395impl<B> fmt::Debug for SendRequest<B> {
396 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
397 f.debug_struct("SendRequest").finish()
398 }
399}
400
401#[cfg(feature = "http2")]
404impl<B> Http2SendRequest<B> {
405 pub(super) fn is_ready(&self) -> bool {
406 self.dispatch.is_ready()
407 }
408
409 pub(super) fn is_closed(&self) -> bool {
410 self.dispatch.is_closed()
411 }
412}
413
414#[cfg(feature = "http2")]
415impl<B> Http2SendRequest<B>
416where
417 B: HttpBody + 'static,
418{
419 pub(super) fn send_request_retryable(
420 &mut self,
421 req: Request<B>,
422 ) -> impl Future<Output = Result<Response<Body>, (crate::Error, Option<Request<B>>)>>
423 where
424 B: Send,
425 {
426 match self.dispatch.try_send(req) {
427 Ok(rx) => {
428 Either::Left(rx.then(move |res| {
429 match res {
430 Ok(Ok(res)) => future::ok(res),
431 Ok(Err(err)) => future::err(err),
432 Err(_) => panic!("dispatch dropped without returning error"),
434 }
435 }))
436 }
437 Err(req) => {
438 debug!("connection was not ready");
439 let err = crate::Error::new_canceled().with("connection was not ready");
440 Either::Right(future::err((err, Some(req))))
441 }
442 }
443 }
444}
445
446#[cfg(feature = "http2")]
447impl<B> fmt::Debug for Http2SendRequest<B> {
448 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
449 f.debug_struct("Http2SendRequest").finish()
450 }
451}
452
453#[cfg(feature = "http2")]
454impl<B> Clone for Http2SendRequest<B> {
455 fn clone(&self) -> Self {
456 Http2SendRequest {
457 dispatch: self.dispatch.clone(),
458 }
459 }
460}
461
462#[cfg_attr(feature = "deprecated", allow(deprecated))]
465impl<T, B> Connection<T, B>
466where
467 T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
468 B: HttpBody + Unpin + Send + 'static,
469 B::Data: Send,
470 B::Error: Into<Box<dyn StdError + Send + Sync>>,
471{
472 pub fn into_parts(self) -> Parts<T> {
476 match self.inner.expect("already upgraded") {
477 #[cfg(feature = "http1")]
478 ProtoClient::H1 { h1 } => {
479 let (io, read_buf, _) = h1.into_inner();
480 Parts {
481 io,
482 read_buf,
483 _inner: (),
484 }
485 }
486 ProtoClient::H2 { .. } => {
487 panic!("http2 cannot into_inner");
488 }
489
490 #[cfg(not(feature = "http1"))]
491 ProtoClient::H1 { h1 } => match h1.0 {},
492 }
493 }
494
495 pub fn poll_without_shutdown(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
507 match *self.inner.as_mut().expect("already upgraded") {
508 #[cfg(feature = "http1")]
509 ProtoClient::H1 { ref mut h1 } => h1.poll_without_shutdown(cx),
510 #[cfg(feature = "http2")]
511 ProtoClient::H2 { ref mut h2, .. } => Pin::new(h2).poll(cx).map_ok(|_| ()),
512
513 #[cfg(not(feature = "http1"))]
514 ProtoClient::H1 { ref mut h1 } => match h1.0 {},
515 #[cfg(not(feature = "http2"))]
516 ProtoClient::H2 { ref mut h2, .. } => match h2.0 {},
517 }
518 }
519
520 pub fn without_shutdown(self) -> impl Future<Output = crate::Result<Parts<T>>> {
523 let mut conn = Some(self);
524 future::poll_fn(move |cx| -> Poll<crate::Result<Parts<T>>> {
525 ready!(conn.as_mut().unwrap().poll_without_shutdown(cx))?;
526 Poll::Ready(Ok(conn.take().unwrap().into_parts()))
527 })
528 }
529
530 #[cfg(feature = "http2")]
540 pub fn http2_is_extended_connect_protocol_enabled(&self) -> bool {
541 match self.inner.as_ref().unwrap() {
542 ProtoClient::H1 { .. } => false,
543 ProtoClient::H2 { h2 } => h2.is_extended_connect_protocol_enabled(),
544 }
545 }
546}
547
548#[cfg_attr(feature = "deprecated", allow(deprecated))]
549impl<T, B> Future for Connection<T, B>
550where
551 T: AsyncRead + AsyncWrite + Unpin + Send,
552 B: HttpBody + Send + 'static,
553 B::Data: Send,
554 B::Error: Into<Box<dyn StdError + Send + Sync>>,
555{
556 type Output = crate::Result<()>;
557
558 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
559 match ready!(Pin::new(self.inner.as_mut().unwrap()).poll(cx))? {
560 proto::Dispatched::Shutdown => Poll::Ready(Ok(())),
561 #[cfg(feature = "http1")]
562 proto::Dispatched::Upgrade(pending) => match self.inner.take() {
563 Some(ProtoClient::H1 { h1 }) => {
564 let (io, buf, _) = h1.into_inner();
565 pending.fulfill(Upgraded::new(io, buf));
566 Poll::Ready(Ok(()))
567 }
568 _ => {
569 drop(pending);
570 unreachable!("Upgrade expects h1");
571 }
572 },
573 }
574 }
575}
576
577#[cfg_attr(feature = "deprecated", allow(deprecated))]
578impl<T, B> fmt::Debug for Connection<T, B>
579where
580 T: AsyncRead + AsyncWrite + fmt::Debug + Send + 'static,
581 B: HttpBody + 'static,
582{
583 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
584 f.debug_struct("Connection").finish()
585 }
586}
587
588#[cfg_attr(feature = "deprecated", allow(deprecated))]
591impl Builder {
592 #[inline]
594 pub fn new() -> Builder {
595 Builder {
596 exec: Exec::Default,
597 h09_responses: false,
598 h1_writev: None,
599 h1_read_buf_exact_size: None,
600 h1_parser_config: Default::default(),
601 h1_title_case_headers: false,
602 h1_preserve_header_case: false,
603 #[cfg(feature = "ffi")]
604 h1_preserve_header_order: false,
605 h1_max_buf_size: None,
606 #[cfg(feature = "ffi")]
607 h1_headers_raw: false,
608 #[cfg(feature = "http2")]
609 h2_builder: Default::default(),
610 #[cfg(feature = "http1")]
611 version: Proto::Http1,
612 #[cfg(not(feature = "http1"))]
613 version: Proto::Http2,
614 }
615 }
616
617 pub fn executor<E>(&mut self, exec: E) -> &mut Builder
619 where
620 E: Executor<BoxSendFuture> + Send + Sync + 'static,
621 {
622 self.exec = Exec::Executor(Arc::new(exec));
623 self
624 }
625
626 pub fn http09_responses(&mut self, enabled: bool) -> &mut Builder {
630 self.h09_responses = enabled;
631 self
632 }
633
634 pub fn http1_allow_spaces_after_header_name_in_responses(
654 &mut self,
655 enabled: bool,
656 ) -> &mut Builder {
657 self.h1_parser_config
658 .allow_spaces_after_header_name_in_responses(enabled);
659 self
660 }
661
662 pub fn http1_allow_obsolete_multiline_headers_in_responses(
697 &mut self,
698 enabled: bool,
699 ) -> &mut Builder {
700 self.h1_parser_config
701 .allow_obsolete_multiline_headers_in_responses(enabled);
702 self
703 }
704
705 pub fn http1_ignore_invalid_headers_in_responses(&mut self, enabled: bool) -> &mut Builder {
715 self.h1_parser_config
716 .ignore_invalid_headers_in_responses(enabled);
717 self
718 }
719
720 pub fn http1_writev(&mut self, enabled: bool) -> &mut Builder {
733 self.h1_writev = Some(enabled);
734 self
735 }
736
737 pub fn http1_title_case_headers(&mut self, enabled: bool) -> &mut Builder {
744 self.h1_title_case_headers = enabled;
745 self
746 }
747
748 pub fn http1_preserve_header_case(&mut self, enabled: bool) -> &mut Builder {
762 self.h1_preserve_header_case = enabled;
763 self
764 }
765
766 #[cfg(feature = "ffi")]
776 pub fn http1_preserve_header_order(&mut self, enabled: bool) -> &mut Builder {
777 self.h1_preserve_header_order = enabled;
778 self
779 }
780
781 pub fn http1_read_buf_exact_size(&mut self, sz: Option<usize>) -> &mut Builder {
787 self.h1_read_buf_exact_size = sz;
788 self.h1_max_buf_size = None;
789 self
790 }
791
792 #[cfg(feature = "http1")]
802 #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
803 pub fn http1_max_buf_size(&mut self, max: usize) -> &mut Self {
804 assert!(
805 max >= proto::h1::MINIMUM_MAX_BUFFER_SIZE,
806 "the max_buf_size cannot be smaller than the minimum that h1 specifies."
807 );
808
809 self.h1_max_buf_size = Some(max);
810 self.h1_read_buf_exact_size = None;
811 self
812 }
813
814 #[cfg(feature = "ffi")]
815 pub(crate) fn http1_headers_raw(&mut self, enabled: bool) -> &mut Self {
816 self.h1_headers_raw = enabled;
817 self
818 }
819
820 #[cfg(feature = "http2")]
824 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
825 pub fn http2_only(&mut self, enabled: bool) -> &mut Builder {
826 if enabled {
827 self.version = Proto::Http2
828 }
829 self
830 }
831
832 #[cfg(feature = "http2")]
841 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
842 pub fn http2_initial_stream_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
843 if let Some(sz) = sz.into() {
844 self.h2_builder.adaptive_window = false;
845 self.h2_builder.initial_stream_window_size = sz;
846 }
847 self
848 }
849
850 #[cfg(feature = "http2")]
856 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
857 pub fn http2_initial_connection_window_size(
858 &mut self,
859 sz: impl Into<Option<u32>>,
860 ) -> &mut Self {
861 if let Some(sz) = sz.into() {
862 self.h2_builder.adaptive_window = false;
863 self.h2_builder.initial_conn_window_size = sz;
864 }
865 self
866 }
867
868 #[cfg(feature = "http2")]
874 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
875 pub fn http2_adaptive_window(&mut self, enabled: bool) -> &mut Self {
876 use proto::h2::SPEC_WINDOW_SIZE;
877
878 self.h2_builder.adaptive_window = enabled;
879 if enabled {
880 self.h2_builder.initial_conn_window_size = SPEC_WINDOW_SIZE;
881 self.h2_builder.initial_stream_window_size = SPEC_WINDOW_SIZE;
882 }
883 self
884 }
885
886 #[cfg(feature = "http2")]
892 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
893 pub fn http2_max_frame_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
894 if let Some(sz) = sz.into() {
895 self.h2_builder.max_frame_size = sz;
896 }
897 self
898 }
899
900 #[cfg(feature = "runtime")]
911 #[cfg(feature = "http2")]
912 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
913 pub fn http2_keep_alive_interval(
914 &mut self,
915 interval: impl Into<Option<Duration>>,
916 ) -> &mut Self {
917 self.h2_builder.keep_alive_interval = interval.into();
918 self
919 }
920
921 #[cfg(feature = "runtime")]
932 #[cfg(feature = "http2")]
933 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
934 pub fn http2_keep_alive_timeout(&mut self, timeout: Duration) -> &mut Self {
935 self.h2_builder.keep_alive_timeout = timeout;
936 self
937 }
938
939 #[cfg(feature = "runtime")]
952 #[cfg(feature = "http2")]
953 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
954 pub fn http2_keep_alive_while_idle(&mut self, enabled: bool) -> &mut Self {
955 self.h2_builder.keep_alive_while_idle = enabled;
956 self
957 }
958
959 #[cfg(feature = "http2")]
968 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
969 pub fn http2_max_concurrent_reset_streams(&mut self, max: usize) -> &mut Self {
970 self.h2_builder.max_concurrent_reset_streams = Some(max);
971 self
972 }
973
974 #[cfg(feature = "http2")]
982 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
983 pub fn http2_max_send_buf_size(&mut self, max: usize) -> &mut Self {
984 assert!(max <= std::u32::MAX as usize);
985 self.h2_builder.max_send_buffer_size = max;
986 self
987 }
988
989 pub fn handshake<T, B>(
995 &self,
996 io: T,
997 ) -> impl Future<Output = crate::Result<(SendRequest<B>, Connection<T, B>)>>
998 where
999 T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
1000 B: HttpBody + 'static,
1001 B::Data: Send,
1002 B::Error: Into<Box<dyn StdError + Send + Sync>>,
1003 {
1004 let opts = self.clone();
1005
1006 async move {
1007 trace!("client handshake {:?}", opts.version);
1008
1009 let (tx, rx) = dispatch::channel();
1010 let proto = match opts.version {
1011 #[cfg(feature = "http1")]
1012 Proto::Http1 => {
1013 let mut conn = proto::Conn::new(io);
1014 conn.set_h1_parser_config(opts.h1_parser_config);
1015 if let Some(writev) = opts.h1_writev {
1016 if writev {
1017 conn.set_write_strategy_queue();
1018 } else {
1019 conn.set_write_strategy_flatten();
1020 }
1021 }
1022 if opts.h1_title_case_headers {
1023 conn.set_title_case_headers();
1024 }
1025 if opts.h1_preserve_header_case {
1026 conn.set_preserve_header_case();
1027 }
1028 #[cfg(feature = "ffi")]
1029 if opts.h1_preserve_header_order {
1030 conn.set_preserve_header_order();
1031 }
1032 if opts.h09_responses {
1033 conn.set_h09_responses();
1034 }
1035
1036 #[cfg(feature = "ffi")]
1037 conn.set_raw_headers(opts.h1_headers_raw);
1038
1039 if let Some(sz) = opts.h1_read_buf_exact_size {
1040 conn.set_read_buf_exact_size(sz);
1041 }
1042 if let Some(max) = opts.h1_max_buf_size {
1043 conn.set_max_buf_size(max);
1044 }
1045 let cd = proto::h1::dispatch::Client::new(rx);
1046 let dispatch = proto::h1::Dispatcher::new(cd, conn);
1047 ProtoClient::H1 { h1: dispatch }
1048 }
1049 #[cfg(feature = "http2")]
1050 Proto::Http2 => {
1051 let h2 =
1052 proto::h2::client::handshake(io, rx, &opts.h2_builder, opts.exec.clone())
1053 .await?;
1054 ProtoClient::H2 { h2 }
1055 }
1056 };
1057
1058 Ok((
1059 SendRequest { dispatch: tx },
1060 Connection { inner: Some(proto) },
1061 ))
1062 }
1063 }
1064}
1065
1066impl Future for ResponseFuture {
1069 type Output = crate::Result<Response<Body>>;
1070
1071 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1072 match self.inner {
1073 ResponseFutureState::Waiting(ref mut rx) => {
1074 Pin::new(rx).poll(cx).map(|res| match res {
1075 Ok(Ok(resp)) => Ok(resp),
1076 Ok(Err(err)) => Err(err),
1077 Err(_canceled) => panic!("dispatch dropped without returning error"),
1079 })
1080 }
1081 ResponseFutureState::Error(ref mut err) => {
1082 Poll::Ready(Err(err.take().expect("polled after ready")))
1083 }
1084 }
1085 }
1086}
1087
1088impl fmt::Debug for ResponseFuture {
1089 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1090 f.debug_struct("ResponseFuture").finish()
1091 }
1092}
1093
1094impl<T, B> Future for ProtoClient<T, B>
1097where
1098 T: AsyncRead + AsyncWrite + Send + Unpin + 'static,
1099 B: HttpBody + Send + 'static,
1100 B::Data: Send,
1101 B::Error: Into<Box<dyn StdError + Send + Sync>>,
1102{
1103 type Output = crate::Result<proto::Dispatched>;
1104
1105 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1106 match self.project() {
1107 #[cfg(feature = "http1")]
1108 ProtoClientProj::H1 { h1 } => h1.poll(cx),
1109 #[cfg(feature = "http2")]
1110 ProtoClientProj::H2 { h2, .. } => h2.poll(cx),
1111
1112 #[cfg(not(feature = "http1"))]
1113 ProtoClientProj::H1 { h1 } => match h1.0 {},
1114 #[cfg(not(feature = "http2"))]
1115 ProtoClientProj::H2 { h2, .. } => match h2.0 {},
1116 }
1117 }
1118}
1119
1120trait AssertSend: Send {}
1123trait AssertSendSync: Send + Sync {}
1124
1125#[cfg_attr(feature = "deprecated", allow(deprecated))]
1126#[doc(hidden)]
1127impl<B: Send> AssertSendSync for SendRequest<B> {}
1128
1129#[cfg_attr(feature = "deprecated", allow(deprecated))]
1130#[doc(hidden)]
1131impl<T: Send, B: Send> AssertSend for Connection<T, B>
1132where
1133 T: AsyncRead + AsyncWrite + Send + 'static,
1134 B: HttpBody + 'static,
1135 B::Data: Send,
1136{
1137}
1138
1139#[cfg_attr(feature = "deprecated", allow(deprecated))]
1140#[doc(hidden)]
1141impl<T: Send + Sync, B: Send + Sync> AssertSendSync for Connection<T, B>
1142where
1143 T: AsyncRead + AsyncWrite + Send + 'static,
1144 B: HttpBody + 'static,
1145 B::Data: Send + Sync + 'static,
1146{
1147}
1148
1149#[cfg_attr(feature = "deprecated", allow(deprecated))]
1150#[doc(hidden)]
1151impl AssertSendSync for Builder {}
1152
1153#[doc(hidden)]
1154impl AssertSend for ResponseFuture {}