hyper/body/
body.rs

1use std::borrow::Cow;
2#[cfg(all(feature = "client", any(feature = "http1", feature = "http2")))]
3use std::convert::Infallible;
4#[cfg(feature = "stream")]
5use std::error::Error as StdError;
6use std::fmt;
7use std::future::Future;
8use std::pin::Pin;
9use std::task::{Context, Poll};
10
11use bytes::Bytes;
12use futures_channel::mpsc;
13use futures_channel::oneshot;
14use futures_core::Stream; // for mpsc::Receiver
15#[cfg(feature = "stream")]
16use futures_util::TryStreamExt;
17use http::HeaderMap;
18use http_body::{Body as HttpBody, SizeHint};
19
20use super::DecodedLength;
21#[cfg(feature = "stream")]
22use crate::common::sync_wrapper::SyncWrapper;
23use crate::common::watch;
24#[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
25use crate::proto::h2::ping;
26
27type BodySender = mpsc::Sender<Result<Bytes, crate::Error>>;
28type TrailersSender = oneshot::Sender<HeaderMap>;
29
30/// A stream of `Bytes`, used when receiving bodies.
31///
32/// A good default [`HttpBody`](crate::body::HttpBody) to use in many
33/// applications.
34///
35/// Note: To read the full body, use [`body::to_bytes`](crate::body::to_bytes())
36/// or [`body::aggregate`](crate::body::aggregate()).
37#[must_use = "streams do nothing unless polled"]
38pub struct Body {
39    kind: Kind,
40    /// Keep the extra bits in an `Option<Box<Extra>>`, so that
41    /// Body stays small in the common case (no extras needed).
42    extra: Option<Box<Extra>>,
43}
44
45enum Kind {
46    Once(Option<Bytes>),
47    Chan {
48        content_length: DecodedLength,
49        want_tx: watch::Sender,
50        data_rx: mpsc::Receiver<Result<Bytes, crate::Error>>,
51        trailers_rx: oneshot::Receiver<HeaderMap>,
52    },
53    #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
54    H2 {
55        ping: ping::Recorder,
56        content_length: DecodedLength,
57        recv: h2::RecvStream,
58    },
59    #[cfg(feature = "ffi")]
60    Ffi(crate::ffi::UserBody),
61    #[cfg(feature = "stream")]
62    Wrapped(
63        SyncWrapper<
64            Pin<Box<dyn Stream<Item = Result<Bytes, Box<dyn StdError + Send + Sync>>> + Send>>,
65        >,
66    ),
67}
68
69struct Extra {
70    /// Allow the client to pass a future to delay the `Body` from returning
71    /// EOF. This allows the `Client` to try to put the idle connection
72    /// back into the pool before the body is "finished".
73    ///
74    /// The reason for this is so that creating a new request after finishing
75    /// streaming the body of a response could sometimes result in creating
76    /// a brand new connection, since the pool didn't know about the idle
77    /// connection yet.
78    delayed_eof: Option<DelayEof>,
79}
80
81#[cfg(all(feature = "client", any(feature = "http1", feature = "http2")))]
82type DelayEofUntil = oneshot::Receiver<Infallible>;
83
84enum DelayEof {
85    /// Initial state, stream hasn't seen EOF yet.
86    #[cfg(any(feature = "http1", feature = "http2"))]
87    #[cfg(feature = "client")]
88    NotEof(DelayEofUntil),
89    /// Transitions to this state once we've seen `poll` try to
90    /// return EOF (`None`). This future is then polled, and
91    /// when it completes, the Body finally returns EOF (`None`).
92    #[cfg(any(feature = "http1", feature = "http2"))]
93    #[cfg(feature = "client")]
94    Eof(DelayEofUntil),
95}
96
97/// A sender half created through [`Body::channel()`].
98///
99/// Useful when wanting to stream chunks from another thread.
100///
101/// ## Body Closing
102///
103/// Note that the request body will always be closed normally when the sender is dropped (meaning
104/// that the empty terminating chunk will be sent to the remote). If you desire to close the
105/// connection with an incomplete response (e.g. in the case of an error during asynchronous
106/// processing), call the [`Sender::abort()`] method to abort the body in an abnormal fashion.
107///
108/// [`Body::channel()`]: struct.Body.html#method.channel
109/// [`Sender::abort()`]: struct.Sender.html#method.abort
110#[must_use = "Sender does nothing unless sent on"]
111pub struct Sender {
112    want_rx: watch::Receiver,
113    data_tx: BodySender,
114    trailers_tx: Option<TrailersSender>,
115}
116
117const WANT_PENDING: usize = 1;
118const WANT_READY: usize = 2;
119
120impl Body {
121    /// Create an empty `Body` stream.
122    ///
123    /// # Example
124    ///
125    /// ```
126    /// use hyper::{Body, Request};
127    ///
128    /// // create a `GET /` request
129    /// let get = Request::new(Body::empty());
130    /// ```
131    #[inline]
132    pub fn empty() -> Body {
133        Body::new(Kind::Once(None))
134    }
135
136    /// Create a `Body` stream with an associated sender half.
137    ///
138    /// Useful when wanting to stream chunks from another thread.
139    #[inline]
140    pub fn channel() -> (Sender, Body) {
141        Self::new_channel(DecodedLength::CHUNKED, /*wanter =*/ false)
142    }
143
144    pub(crate) fn new_channel(content_length: DecodedLength, wanter: bool) -> (Sender, Body) {
145        let (data_tx, data_rx) = mpsc::channel(0);
146        let (trailers_tx, trailers_rx) = oneshot::channel();
147
148        // If wanter is true, `Sender::poll_ready()` won't becoming ready
149        // until the `Body` has been polled for data once.
150        let want = if wanter { WANT_PENDING } else { WANT_READY };
151
152        let (want_tx, want_rx) = watch::channel(want);
153
154        let tx = Sender {
155            want_rx,
156            data_tx,
157            trailers_tx: Some(trailers_tx),
158        };
159        let rx = Body::new(Kind::Chan {
160            content_length,
161            want_tx,
162            data_rx,
163            trailers_rx,
164        });
165
166        (tx, rx)
167    }
168
169    /// Wrap a futures `Stream` in a box inside `Body`.
170    ///
171    /// # Example
172    ///
173    /// ```
174    /// # use hyper::Body;
175    /// let chunks: Vec<Result<_, std::io::Error>> = vec![
176    ///     Ok("hello"),
177    ///     Ok(" "),
178    ///     Ok("world"),
179    /// ];
180    ///
181    /// let stream = futures_util::stream::iter(chunks);
182    ///
183    /// let body = Body::wrap_stream(stream);
184    /// ```
185    ///
186    /// # Optional
187    ///
188    /// This function requires enabling the `stream` feature in your
189    /// `Cargo.toml`.
190    #[cfg(feature = "stream")]
191    #[cfg_attr(docsrs, doc(cfg(feature = "stream")))]
192    pub fn wrap_stream<S, O, E>(stream: S) -> Body
193    where
194        S: Stream<Item = Result<O, E>> + Send + 'static,
195        O: Into<Bytes> + 'static,
196        E: Into<Box<dyn StdError + Send + Sync>> + 'static,
197    {
198        let mapped = stream.map_ok(Into::into).map_err(Into::into);
199        Body::new(Kind::Wrapped(SyncWrapper::new(Box::pin(mapped))))
200    }
201
202    fn new(kind: Kind) -> Body {
203        Body { kind, extra: None }
204    }
205
206    #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
207    pub(crate) fn h2(
208        recv: h2::RecvStream,
209        mut content_length: DecodedLength,
210        ping: ping::Recorder,
211    ) -> Self {
212        // If the stream is already EOS, then the "unknown length" is clearly
213        // actually ZERO.
214        if !content_length.is_exact() && recv.is_end_stream() {
215            content_length = DecodedLength::ZERO;
216        }
217        let body = Body::new(Kind::H2 {
218            ping,
219            content_length,
220            recv,
221        });
222
223        body
224    }
225
226    #[cfg(any(feature = "http1", feature = "http2"))]
227    #[cfg(feature = "client")]
228    pub(crate) fn delayed_eof(&mut self, fut: DelayEofUntil) {
229        self.extra_mut().delayed_eof = Some(DelayEof::NotEof(fut));
230    }
231
232    fn take_delayed_eof(&mut self) -> Option<DelayEof> {
233        self.extra
234            .as_mut()
235            .and_then(|extra| extra.delayed_eof.take())
236    }
237
238    #[cfg(any(feature = "http1", feature = "http2"))]
239    fn extra_mut(&mut self) -> &mut Extra {
240        self.extra
241            .get_or_insert_with(|| Box::new(Extra { delayed_eof: None }))
242    }
243
244    fn poll_eof(&mut self, cx: &mut Context<'_>) -> Poll<Option<crate::Result<Bytes>>> {
245        match self.take_delayed_eof() {
246            #[cfg(any(feature = "http1", feature = "http2"))]
247            #[cfg(feature = "client")]
248            Some(DelayEof::NotEof(mut delay)) => match self.poll_inner(cx) {
249                ok @ Poll::Ready(Some(Ok(..))) | ok @ Poll::Pending => {
250                    self.extra_mut().delayed_eof = Some(DelayEof::NotEof(delay));
251                    ok
252                }
253                Poll::Ready(None) => match Pin::new(&mut delay).poll(cx) {
254                    Poll::Ready(Ok(never)) => match never {},
255                    Poll::Pending => {
256                        self.extra_mut().delayed_eof = Some(DelayEof::Eof(delay));
257                        Poll::Pending
258                    }
259                    Poll::Ready(Err(_done)) => Poll::Ready(None),
260                },
261                Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
262            },
263            #[cfg(any(feature = "http1", feature = "http2"))]
264            #[cfg(feature = "client")]
265            Some(DelayEof::Eof(mut delay)) => match Pin::new(&mut delay).poll(cx) {
266                Poll::Ready(Ok(never)) => match never {},
267                Poll::Pending => {
268                    self.extra_mut().delayed_eof = Some(DelayEof::Eof(delay));
269                    Poll::Pending
270                }
271                Poll::Ready(Err(_done)) => Poll::Ready(None),
272            },
273            #[cfg(any(
274                not(any(feature = "http1", feature = "http2")),
275                not(feature = "client")
276            ))]
277            Some(delay_eof) => match delay_eof {},
278            None => self.poll_inner(cx),
279        }
280    }
281
282    #[cfg(feature = "ffi")]
283    pub(crate) fn as_ffi_mut(&mut self) -> &mut crate::ffi::UserBody {
284        match self.kind {
285            Kind::Ffi(ref mut body) => return body,
286            _ => {
287                self.kind = Kind::Ffi(crate::ffi::UserBody::new());
288            }
289        }
290
291        match self.kind {
292            Kind::Ffi(ref mut body) => body,
293            _ => unreachable!(),
294        }
295    }
296
297    fn poll_inner(&mut self, cx: &mut Context<'_>) -> Poll<Option<crate::Result<Bytes>>> {
298        match self.kind {
299            Kind::Once(ref mut val) => Poll::Ready(val.take().map(Ok)),
300            Kind::Chan {
301                content_length: ref mut len,
302                ref mut data_rx,
303                ref mut want_tx,
304                ..
305            } => {
306                want_tx.send(WANT_READY);
307
308                match ready!(Pin::new(data_rx).poll_next(cx)?) {
309                    Some(chunk) => {
310                        len.sub_if(chunk.len() as u64);
311                        Poll::Ready(Some(Ok(chunk)))
312                    }
313                    None => Poll::Ready(None),
314                }
315            }
316            #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
317            Kind::H2 {
318                ref ping,
319                recv: ref mut h2,
320                content_length: ref mut len,
321            } => match ready!(h2.poll_data(cx)) {
322                Some(Ok(bytes)) => {
323                    let _ = h2.flow_control().release_capacity(bytes.len());
324                    len.sub_if(bytes.len() as u64);
325                    ping.record_data(bytes.len());
326                    Poll::Ready(Some(Ok(bytes)))
327                }
328                Some(Err(e)) => match e.reason() {
329                    // These reasons should cause stop of body reading, but nor fail it.
330                    // The same logic as for `AsyncRead for H2Upgraded` is applied here.
331                    Some(h2::Reason::NO_ERROR) | Some(h2::Reason::CANCEL) => Poll::Ready(None),
332                    _ => Poll::Ready(Some(Err(crate::Error::new_body(e)))),
333                },
334                None => Poll::Ready(None),
335            },
336
337            #[cfg(feature = "ffi")]
338            Kind::Ffi(ref mut body) => body.poll_data(cx),
339
340            #[cfg(feature = "stream")]
341            Kind::Wrapped(ref mut s) => match ready!(s.get_mut().as_mut().poll_next(cx)) {
342                Some(res) => Poll::Ready(Some(res.map_err(crate::Error::new_body))),
343                None => Poll::Ready(None),
344            },
345        }
346    }
347
348    #[cfg(feature = "http1")]
349    pub(super) fn take_full_data(&mut self) -> Option<Bytes> {
350        if let Kind::Once(ref mut chunk) = self.kind {
351            chunk.take()
352        } else {
353            None
354        }
355    }
356}
357
358impl Default for Body {
359    /// Returns `Body::empty()`.
360    #[inline]
361    fn default() -> Body {
362        Body::empty()
363    }
364}
365
366impl HttpBody for Body {
367    type Data = Bytes;
368    type Error = crate::Error;
369
370    fn poll_data(
371        mut self: Pin<&mut Self>,
372        cx: &mut Context<'_>,
373    ) -> Poll<Option<Result<Self::Data, Self::Error>>> {
374        self.poll_eof(cx)
375    }
376
377    fn poll_trailers(
378        #[cfg_attr(not(feature = "http2"), allow(unused_mut))] mut self: Pin<&mut Self>,
379        #[cfg_attr(not(feature = "http2"), allow(unused))] cx: &mut Context<'_>,
380    ) -> Poll<Result<Option<HeaderMap>, Self::Error>> {
381        match self.kind {
382            #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
383            Kind::H2 {
384                recv: ref mut h2,
385                ref ping,
386                ..
387            } => match ready!(h2.poll_trailers(cx)) {
388                Ok(t) => {
389                    ping.record_non_data();
390                    Poll::Ready(Ok(t))
391                }
392                Err(e) => Poll::Ready(Err(crate::Error::new_h2(e))),
393            },
394            Kind::Chan {
395                ref mut trailers_rx,
396                ..
397            } => match ready!(Pin::new(trailers_rx).poll(cx)) {
398                Ok(t) => Poll::Ready(Ok(Some(t))),
399                Err(_) => Poll::Ready(Ok(None)),
400            },
401            #[cfg(feature = "ffi")]
402            Kind::Ffi(ref mut body) => body.poll_trailers(cx),
403            _ => Poll::Ready(Ok(None)),
404        }
405    }
406
407    fn is_end_stream(&self) -> bool {
408        match self.kind {
409            Kind::Once(ref val) => val.is_none(),
410            Kind::Chan { content_length, .. } => content_length == DecodedLength::ZERO,
411            #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
412            Kind::H2 { recv: ref h2, .. } => h2.is_end_stream(),
413            #[cfg(feature = "ffi")]
414            Kind::Ffi(..) => false,
415            #[cfg(feature = "stream")]
416            Kind::Wrapped(..) => false,
417        }
418    }
419
420    fn size_hint(&self) -> SizeHint {
421        macro_rules! opt_len {
422            ($content_length:expr) => {{
423                let mut hint = SizeHint::default();
424
425                if let Some(content_length) = $content_length.into_opt() {
426                    hint.set_exact(content_length);
427                }
428
429                hint
430            }};
431        }
432
433        match self.kind {
434            Kind::Once(Some(ref val)) => SizeHint::with_exact(val.len() as u64),
435            Kind::Once(None) => SizeHint::with_exact(0),
436            #[cfg(feature = "stream")]
437            Kind::Wrapped(..) => SizeHint::default(),
438            Kind::Chan { content_length, .. } => opt_len!(content_length),
439            #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
440            Kind::H2 { content_length, .. } => opt_len!(content_length),
441            #[cfg(feature = "ffi")]
442            Kind::Ffi(..) => SizeHint::default(),
443        }
444    }
445}
446
447impl fmt::Debug for Body {
448    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
449        #[derive(Debug)]
450        struct Streaming;
451        #[derive(Debug)]
452        struct Empty;
453        #[derive(Debug)]
454        struct Full<'a>(&'a Bytes);
455
456        let mut builder = f.debug_tuple("Body");
457        match self.kind {
458            Kind::Once(None) => builder.field(&Empty),
459            Kind::Once(Some(ref chunk)) => builder.field(&Full(chunk)),
460            _ => builder.field(&Streaming),
461        };
462
463        builder.finish()
464    }
465}
466
467/// # Optional
468///
469/// This function requires enabling the `stream` feature in your
470/// `Cargo.toml`.
471#[cfg(feature = "stream")]
472impl Stream for Body {
473    type Item = crate::Result<Bytes>;
474
475    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
476        HttpBody::poll_data(self, cx)
477    }
478}
479
480/// # Optional
481///
482/// This function requires enabling the `stream` feature in your
483/// `Cargo.toml`.
484#[cfg(feature = "stream")]
485impl From<Box<dyn Stream<Item = Result<Bytes, Box<dyn StdError + Send + Sync>>> + Send>> for Body {
486    #[inline]
487    fn from(
488        stream: Box<dyn Stream<Item = Result<Bytes, Box<dyn StdError + Send + Sync>>> + Send>,
489    ) -> Body {
490        Body::new(Kind::Wrapped(SyncWrapper::new(stream.into())))
491    }
492}
493
494impl From<Bytes> for Body {
495    #[inline]
496    fn from(chunk: Bytes) -> Body {
497        if chunk.is_empty() {
498            Body::empty()
499        } else {
500            Body::new(Kind::Once(Some(chunk)))
501        }
502    }
503}
504
505impl From<Vec<u8>> for Body {
506    #[inline]
507    fn from(vec: Vec<u8>) -> Body {
508        Body::from(Bytes::from(vec))
509    }
510}
511
512impl From<&'static [u8]> for Body {
513    #[inline]
514    fn from(slice: &'static [u8]) -> Body {
515        Body::from(Bytes::from(slice))
516    }
517}
518
519impl From<Cow<'static, [u8]>> for Body {
520    #[inline]
521    fn from(cow: Cow<'static, [u8]>) -> Body {
522        match cow {
523            Cow::Borrowed(b) => Body::from(b),
524            Cow::Owned(o) => Body::from(o),
525        }
526    }
527}
528
529impl From<String> for Body {
530    #[inline]
531    fn from(s: String) -> Body {
532        Body::from(Bytes::from(s.into_bytes()))
533    }
534}
535
536impl From<&'static str> for Body {
537    #[inline]
538    fn from(slice: &'static str) -> Body {
539        Body::from(Bytes::from(slice.as_bytes()))
540    }
541}
542
543impl From<Cow<'static, str>> for Body {
544    #[inline]
545    fn from(cow: Cow<'static, str>) -> Body {
546        match cow {
547            Cow::Borrowed(b) => Body::from(b),
548            Cow::Owned(o) => Body::from(o),
549        }
550    }
551}
552
553impl Sender {
554    /// Check to see if this `Sender` can send more data.
555    pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
556        // Check if the receiver end has tried polling for the body yet
557        ready!(self.poll_want(cx)?);
558        self.data_tx
559            .poll_ready(cx)
560            .map_err(|_| crate::Error::new_closed())
561    }
562
563    fn poll_want(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
564        match self.want_rx.load(cx) {
565            WANT_READY => Poll::Ready(Ok(())),
566            WANT_PENDING => Poll::Pending,
567            watch::CLOSED => Poll::Ready(Err(crate::Error::new_closed())),
568            unexpected => unreachable!("want_rx value: {}", unexpected),
569        }
570    }
571
572    async fn ready(&mut self) -> crate::Result<()> {
573        futures_util::future::poll_fn(|cx| self.poll_ready(cx)).await
574    }
575
576    /// Send data on data channel when it is ready.
577    pub async fn send_data(&mut self, chunk: Bytes) -> crate::Result<()> {
578        self.ready().await?;
579        self.data_tx
580            .try_send(Ok(chunk))
581            .map_err(|_| crate::Error::new_closed())
582    }
583
584    /// Send trailers on trailers channel.
585    pub async fn send_trailers(&mut self, trailers: HeaderMap) -> crate::Result<()> {
586        let tx = match self.trailers_tx.take() {
587            Some(tx) => tx,
588            None => return Err(crate::Error::new_closed()),
589        };
590        tx.send(trailers).map_err(|_| crate::Error::new_closed())
591    }
592
593    /// Try to send data on this channel.
594    ///
595    /// # Errors
596    ///
597    /// Returns `Err(Bytes)` if the channel could not (currently) accept
598    /// another `Bytes`.
599    ///
600    /// # Note
601    ///
602    /// This is mostly useful for when trying to send from some other thread
603    /// that doesn't have an async context. If in an async context, prefer
604    /// `send_data()` instead.
605    pub fn try_send_data(&mut self, chunk: Bytes) -> Result<(), Bytes> {
606        self.data_tx
607            .try_send(Ok(chunk))
608            .map_err(|err| err.into_inner().expect("just sent Ok"))
609    }
610
611    /// Aborts the body in an abnormal fashion.
612    pub fn abort(mut self) {
613        self.send_error(crate::Error::new_body_write_aborted());
614    }
615
616    pub(crate) fn send_error(&mut self, err: crate::Error) {
617        let _ = self
618            .data_tx
619            // clone so the send works even if buffer is full
620            .clone()
621            .try_send(Err(err));
622    }
623}
624
625impl fmt::Debug for Sender {
626    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
627        #[derive(Debug)]
628        struct Open;
629        #[derive(Debug)]
630        struct Closed;
631
632        let mut builder = f.debug_tuple("Sender");
633        match self.want_rx.peek() {
634            watch::CLOSED => builder.field(&Closed),
635            _ => builder.field(&Open),
636        };
637
638        builder.finish()
639    }
640}
641
642#[cfg(test)]
643mod tests {
644    use std::mem;
645    use std::task::Poll;
646
647    use super::{Body, DecodedLength, HttpBody, Sender, SizeHint};
648
649    #[test]
650    fn test_size_of() {
651        // These are mostly to help catch *accidentally* increasing
652        // the size by too much.
653
654        let body_size = mem::size_of::<Body>();
655        let body_expected_size = mem::size_of::<u64>() * 6;
656        assert!(
657            body_size <= body_expected_size,
658            "Body size = {} <= {}",
659            body_size,
660            body_expected_size,
661        );
662
663        assert_eq!(body_size, mem::size_of::<Option<Body>>(), "Option<Body>");
664
665        assert_eq!(
666            mem::size_of::<Sender>(),
667            mem::size_of::<usize>() * 5,
668            "Sender"
669        );
670
671        assert_eq!(
672            mem::size_of::<Sender>(),
673            mem::size_of::<Option<Sender>>(),
674            "Option<Sender>"
675        );
676    }
677
678    #[test]
679    fn size_hint() {
680        fn eq(body: Body, b: SizeHint, note: &str) {
681            let a = body.size_hint();
682            assert_eq!(a.lower(), b.lower(), "lower for {:?}", note);
683            assert_eq!(a.upper(), b.upper(), "upper for {:?}", note);
684        }
685
686        eq(Body::from("Hello"), SizeHint::with_exact(5), "from str");
687
688        eq(Body::empty(), SizeHint::with_exact(0), "empty");
689
690        eq(Body::channel().1, SizeHint::new(), "channel");
691
692        eq(
693            Body::new_channel(DecodedLength::new(4), /*wanter =*/ false).1,
694            SizeHint::with_exact(4),
695            "channel with length",
696        );
697    }
698
699    #[tokio::test]
700    async fn channel_abort() {
701        let (tx, mut rx) = Body::channel();
702
703        tx.abort();
704
705        let err = rx.data().await.unwrap().unwrap_err();
706        assert!(err.is_body_write_aborted(), "{:?}", err);
707    }
708
709    #[tokio::test]
710    async fn channel_abort_when_buffer_is_full() {
711        let (mut tx, mut rx) = Body::channel();
712
713        tx.try_send_data("chunk 1".into()).expect("send 1");
714        // buffer is full, but can still send abort
715        tx.abort();
716
717        let chunk1 = rx.data().await.expect("item 1").expect("chunk 1");
718        assert_eq!(chunk1, "chunk 1");
719
720        let err = rx.data().await.unwrap().unwrap_err();
721        assert!(err.is_body_write_aborted(), "{:?}", err);
722    }
723
724    #[test]
725    fn channel_buffers_one() {
726        let (mut tx, _rx) = Body::channel();
727
728        tx.try_send_data("chunk 1".into()).expect("send 1");
729
730        // buffer is now full
731        let chunk2 = tx.try_send_data("chunk 2".into()).expect_err("send 2");
732        assert_eq!(chunk2, "chunk 2");
733    }
734
735    #[tokio::test]
736    async fn channel_empty() {
737        let (_, mut rx) = Body::channel();
738
739        assert!(rx.data().await.is_none());
740    }
741
742    #[test]
743    fn channel_ready() {
744        let (mut tx, _rx) = Body::new_channel(DecodedLength::CHUNKED, /*wanter = */ false);
745
746        let mut tx_ready = tokio_test::task::spawn(tx.ready());
747
748        assert!(tx_ready.poll().is_ready(), "tx is ready immediately");
749    }
750
751    #[test]
752    fn channel_wanter() {
753        let (mut tx, mut rx) = Body::new_channel(DecodedLength::CHUNKED, /*wanter = */ true);
754
755        let mut tx_ready = tokio_test::task::spawn(tx.ready());
756        let mut rx_data = tokio_test::task::spawn(rx.data());
757
758        assert!(
759            tx_ready.poll().is_pending(),
760            "tx isn't ready before rx has been polled"
761        );
762
763        assert!(rx_data.poll().is_pending(), "poll rx.data");
764        assert!(tx_ready.is_woken(), "rx poll wakes tx");
765
766        assert!(
767            tx_ready.poll().is_ready(),
768            "tx is ready after rx has been polled"
769        );
770    }
771
772    #[test]
773    fn channel_notices_closure() {
774        let (mut tx, rx) = Body::new_channel(DecodedLength::CHUNKED, /*wanter = */ true);
775
776        let mut tx_ready = tokio_test::task::spawn(tx.ready());
777
778        assert!(
779            tx_ready.poll().is_pending(),
780            "tx isn't ready before rx has been polled"
781        );
782
783        drop(rx);
784        assert!(tx_ready.is_woken(), "dropping rx wakes tx");
785
786        match tx_ready.poll() {
787            Poll::Ready(Err(ref e)) if e.is_closed() => (),
788            unexpected => panic!("tx poll ready unexpected: {:?}", unexpected),
789        }
790    }
791}