reqwest/async_impl/
body.rs

1use std::fmt;
2use std::future::Future;
3use std::pin::Pin;
4use std::task::{Context, Poll};
5
6use bytes::Bytes;
7use futures_core::Stream;
8use http_body::Body as HttpBody;
9use pin_project_lite::pin_project;
10use sync_wrapper::SyncWrapper;
11#[cfg(feature = "stream")]
12use tokio::fs::File;
13use tokio::time::Sleep;
14#[cfg(feature = "stream")]
15use tokio_util::io::ReaderStream;
16
17/// An asynchronous request body.
18pub struct Body {
19    inner: Inner,
20}
21
22// The `Stream` trait isn't stable, so the impl isn't public.
23pub(crate) struct ImplStream(Body);
24
25enum Inner {
26    Reusable(Bytes),
27    Streaming {
28        body: Pin<
29            Box<
30                dyn HttpBody<Data = Bytes, Error = Box<dyn std::error::Error + Send + Sync>>
31                    + Send
32                    + Sync,
33            >,
34        >,
35        timeout: Option<Pin<Box<Sleep>>>,
36    },
37}
38
39pin_project! {
40    struct WrapStream<S> {
41        #[pin]
42        inner: SyncWrapper<S>,
43    }
44}
45
46struct WrapHyper(hyper::Body);
47
48impl Body {
49    /// Returns a reference to the internal data of the `Body`.
50    ///
51    /// `None` is returned, if the underlying data is a stream.
52    pub fn as_bytes(&self) -> Option<&[u8]> {
53        match &self.inner {
54            Inner::Reusable(bytes) => Some(bytes.as_ref()),
55            Inner::Streaming { .. } => None,
56        }
57    }
58
59    /// Wrap a futures `Stream` in a box inside `Body`.
60    ///
61    /// # Example
62    ///
63    /// ```
64    /// # use reqwest::Body;
65    /// # use futures_util;
66    /// # fn main() {
67    /// let chunks: Vec<Result<_, ::std::io::Error>> = vec![
68    ///     Ok("hello"),
69    ///     Ok(" "),
70    ///     Ok("world"),
71    /// ];
72    ///
73    /// let stream = futures_util::stream::iter(chunks);
74    ///
75    /// let body = Body::wrap_stream(stream);
76    /// # }
77    /// ```
78    ///
79    /// # Optional
80    ///
81    /// This requires the `stream` feature to be enabled.
82    #[cfg(feature = "stream")]
83    #[cfg_attr(docsrs, doc(cfg(feature = "stream")))]
84    pub fn wrap_stream<S>(stream: S) -> Body
85    where
86        S: futures_core::stream::TryStream + Send + 'static,
87        S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
88        Bytes: From<S::Ok>,
89    {
90        Body::stream(stream)
91    }
92
93    pub(crate) fn stream<S>(stream: S) -> Body
94    where
95        S: futures_core::stream::TryStream + Send + 'static,
96        S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
97        Bytes: From<S::Ok>,
98    {
99        use futures_util::TryStreamExt;
100
101        let body = Box::pin(WrapStream {
102            inner: SyncWrapper::new(stream.map_ok(Bytes::from).map_err(Into::into)),
103        });
104        Body {
105            inner: Inner::Streaming {
106                body,
107                timeout: None,
108            },
109        }
110    }
111
112    pub(crate) fn response(body: hyper::Body, timeout: Option<Pin<Box<Sleep>>>) -> Body {
113        Body {
114            inner: Inner::Streaming {
115                body: Box::pin(WrapHyper(body)),
116                timeout,
117            },
118        }
119    }
120
121    #[cfg(feature = "blocking")]
122    pub(crate) fn wrap(body: hyper::Body) -> Body {
123        Body {
124            inner: Inner::Streaming {
125                body: Box::pin(WrapHyper(body)),
126                timeout: None,
127            },
128        }
129    }
130
131    pub(crate) fn empty() -> Body {
132        Body::reusable(Bytes::new())
133    }
134
135    pub(crate) fn reusable(chunk: Bytes) -> Body {
136        Body {
137            inner: Inner::Reusable(chunk),
138        }
139    }
140
141    pub(crate) fn try_reuse(self) -> (Option<Bytes>, Self) {
142        let reuse = match self.inner {
143            Inner::Reusable(ref chunk) => Some(chunk.clone()),
144            Inner::Streaming { .. } => None,
145        };
146
147        (reuse, self)
148    }
149
150    pub(crate) fn try_clone(&self) -> Option<Body> {
151        match self.inner {
152            Inner::Reusable(ref chunk) => Some(Body::reusable(chunk.clone())),
153            Inner::Streaming { .. } => None,
154        }
155    }
156
157    pub(crate) fn into_stream(self) -> ImplStream {
158        ImplStream(self)
159    }
160
161    #[cfg(feature = "multipart")]
162    pub(crate) fn content_length(&self) -> Option<u64> {
163        match self.inner {
164            Inner::Reusable(ref bytes) => Some(bytes.len() as u64),
165            Inner::Streaming { ref body, .. } => body.size_hint().exact(),
166        }
167    }
168}
169
170impl From<hyper::Body> for Body {
171    #[inline]
172    fn from(body: hyper::Body) -> Body {
173        Self {
174            inner: Inner::Streaming {
175                body: Box::pin(WrapHyper(body)),
176                timeout: None,
177            },
178        }
179    }
180}
181
182impl From<Bytes> for Body {
183    #[inline]
184    fn from(bytes: Bytes) -> Body {
185        Body::reusable(bytes)
186    }
187}
188
189impl From<Vec<u8>> for Body {
190    #[inline]
191    fn from(vec: Vec<u8>) -> Body {
192        Body::reusable(vec.into())
193    }
194}
195
196impl From<&'static [u8]> for Body {
197    #[inline]
198    fn from(s: &'static [u8]) -> Body {
199        Body::reusable(Bytes::from_static(s))
200    }
201}
202
203impl From<String> for Body {
204    #[inline]
205    fn from(s: String) -> Body {
206        Body::reusable(s.into())
207    }
208}
209
210impl From<&'static str> for Body {
211    #[inline]
212    fn from(s: &'static str) -> Body {
213        s.as_bytes().into()
214    }
215}
216
217#[cfg(feature = "stream")]
218#[cfg_attr(docsrs, doc(cfg(feature = "stream")))]
219impl From<File> for Body {
220    #[inline]
221    fn from(file: File) -> Body {
222        Body::wrap_stream(ReaderStream::new(file))
223    }
224}
225
226impl fmt::Debug for Body {
227    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
228        f.debug_struct("Body").finish()
229    }
230}
231
232// ===== impl ImplStream =====
233
234impl HttpBody for ImplStream {
235    type Data = Bytes;
236    type Error = crate::Error;
237
238    fn poll_data(
239        mut self: Pin<&mut Self>,
240        cx: &mut Context,
241    ) -> Poll<Option<Result<Self::Data, Self::Error>>> {
242        let opt_try_chunk = match self.0.inner {
243            Inner::Streaming {
244                ref mut body,
245                ref mut timeout,
246            } => {
247                if let Some(ref mut timeout) = timeout {
248                    if let Poll::Ready(()) = timeout.as_mut().poll(cx) {
249                        return Poll::Ready(Some(Err(crate::error::body(crate::error::TimedOut))));
250                    }
251                }
252                futures_core::ready!(Pin::new(body).poll_data(cx))
253                    .map(|opt_chunk| opt_chunk.map(Into::into).map_err(crate::error::body))
254            }
255            Inner::Reusable(ref mut bytes) => {
256                if bytes.is_empty() {
257                    None
258                } else {
259                    Some(Ok(std::mem::replace(bytes, Bytes::new())))
260                }
261            }
262        };
263
264        Poll::Ready(opt_try_chunk)
265    }
266
267    fn poll_trailers(
268        self: Pin<&mut Self>,
269        _cx: &mut Context,
270    ) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> {
271        Poll::Ready(Ok(None))
272    }
273
274    fn is_end_stream(&self) -> bool {
275        match self.0.inner {
276            Inner::Streaming { ref body, .. } => body.is_end_stream(),
277            Inner::Reusable(ref bytes) => bytes.is_empty(),
278        }
279    }
280
281    fn size_hint(&self) -> http_body::SizeHint {
282        match self.0.inner {
283            Inner::Streaming { ref body, .. } => body.size_hint(),
284            Inner::Reusable(ref bytes) => {
285                let mut hint = http_body::SizeHint::default();
286                hint.set_exact(bytes.len() as u64);
287                hint
288            }
289        }
290    }
291}
292
293impl Stream for ImplStream {
294    type Item = Result<Bytes, crate::Error>;
295
296    fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
297        self.poll_data(cx)
298    }
299}
300
301// ===== impl WrapStream =====
302
303impl<S, D, E> HttpBody for WrapStream<S>
304where
305    S: Stream<Item = Result<D, E>>,
306    D: Into<Bytes>,
307    E: Into<Box<dyn std::error::Error + Send + Sync>>,
308{
309    type Data = Bytes;
310    type Error = E;
311
312    fn poll_data(
313        self: Pin<&mut Self>,
314        cx: &mut Context,
315    ) -> Poll<Option<Result<Self::Data, Self::Error>>> {
316        let item = futures_core::ready!(self.project().inner.get_pin_mut().poll_next(cx)?);
317
318        Poll::Ready(item.map(|val| Ok(val.into())))
319    }
320
321    fn poll_trailers(
322        self: Pin<&mut Self>,
323        _cx: &mut Context,
324    ) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> {
325        Poll::Ready(Ok(None))
326    }
327}
328
329// ===== impl WrapHyper =====
330
331impl HttpBody for WrapHyper {
332    type Data = Bytes;
333    type Error = Box<dyn std::error::Error + Send + Sync>;
334
335    fn poll_data(
336        mut self: Pin<&mut Self>,
337        cx: &mut Context,
338    ) -> Poll<Option<Result<Self::Data, Self::Error>>> {
339        // safe pin projection
340        Pin::new(&mut self.0)
341            .poll_data(cx)
342            .map(|opt| opt.map(|res| res.map_err(Into::into)))
343    }
344
345    fn poll_trailers(
346        self: Pin<&mut Self>,
347        _cx: &mut Context,
348    ) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> {
349        Poll::Ready(Ok(None))
350    }
351
352    fn is_end_stream(&self) -> bool {
353        self.0.is_end_stream()
354    }
355
356    fn size_hint(&self) -> http_body::SizeHint {
357        HttpBody::size_hint(&self.0)
358    }
359}
360
361#[cfg(test)]
362mod tests {
363    use super::Body;
364
365    #[test]
366    fn test_as_bytes() {
367        let test_data = b"Test body";
368        let body = Body::from(&test_data[..]);
369        assert_eq!(body.as_bytes(), Some(&test_data[..]));
370    }
371}