http_body/
collect.rs

1use std::{
2    collections::VecDeque,
3    future::Future,
4    pin::Pin,
5    task::{Context, Poll},
6};
7
8use super::Body;
9
10use bytes::{Buf, Bytes};
11use http::HeaderMap;
12use pin_project_lite::pin_project;
13
14pin_project! {
15    /// Future that resolves into a [`Collected`].
16    pub struct Collect<T>
17    where
18        T: Body,
19    {
20        #[pin]
21        body: T,
22        collected: Option<Collected<T::Data>>,
23        is_data_done: bool,
24    }
25}
26
27impl<T: Body> Collect<T> {
28    pub(crate) fn new(body: T) -> Self {
29        Self {
30            body,
31            collected: Some(Collected::default()),
32            is_data_done: false,
33        }
34    }
35}
36
37impl<T: Body> Future for Collect<T> {
38    type Output = Result<Collected<T::Data>, T::Error>;
39
40    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
41        let mut me = self.project();
42
43        loop {
44            if !*me.is_data_done {
45                match me.body.as_mut().poll_data(cx) {
46                    Poll::Ready(Some(Ok(data))) => {
47                        me.collected.as_mut().unwrap().push_data(data);
48                    }
49                    Poll::Ready(Some(Err(err))) => {
50                        return Poll::Ready(Err(err));
51                    }
52                    Poll::Ready(None) => {
53                        *me.is_data_done = true;
54                    }
55                    Poll::Pending => return Poll::Pending,
56                }
57            } else {
58                match me.body.as_mut().poll_trailers(cx) {
59                    Poll::Ready(Ok(Some(trailers))) => {
60                        me.collected.as_mut().unwrap().push_trailers(trailers);
61                        break;
62                    }
63                    Poll::Ready(Err(err)) => {
64                        return Poll::Ready(Err(err));
65                    }
66                    Poll::Ready(Ok(None)) => break,
67                    Poll::Pending => return Poll::Pending,
68                }
69            }
70        }
71
72        Poll::Ready(Ok(me.collected.take().expect("polled after complete")))
73    }
74}
75
76/// A collected body produced by [`Body::collect`] which collects all the DATA frames
77/// and trailers.
78#[derive(Debug)]
79pub struct Collected<B> {
80    bufs: BufList<B>,
81    trailers: Option<HeaderMap>,
82}
83
84impl<B: Buf> Collected<B> {
85    /// If there is a trailers frame buffered, returns a reference to it.
86    ///
87    /// Returns `None` if the body contained no trailers.
88    pub fn trailers(&self) -> Option<&HeaderMap> {
89        self.trailers.as_ref()
90    }
91
92    /// Aggregate this buffered into a [`Buf`].
93    pub fn aggregate(self) -> impl Buf {
94        self.bufs
95    }
96
97    /// Convert this body into a [`Bytes`].
98    pub fn to_bytes(mut self) -> Bytes {
99        self.bufs.copy_to_bytes(self.bufs.remaining())
100    }
101
102    fn push_data(&mut self, data: B) {
103        // Only push this frame if it has some data in it, to avoid crashing on
104        // `BufList::push`.
105        if data.has_remaining() {
106            self.bufs.push(data);
107        }
108    }
109
110    fn push_trailers(&mut self, trailers: HeaderMap) {
111        if let Some(current) = &mut self.trailers {
112            current.extend(trailers);
113        } else {
114            self.trailers = Some(trailers);
115        }
116    }
117}
118
119impl<B> Default for Collected<B> {
120    fn default() -> Self {
121        Self {
122            bufs: BufList::default(),
123            trailers: None,
124        }
125    }
126}
127
128impl<B> Unpin for Collected<B> {}
129
130#[derive(Debug)]
131struct BufList<T> {
132    bufs: VecDeque<T>,
133}
134
135impl<T: Buf> BufList<T> {
136    #[inline]
137    pub(crate) fn push(&mut self, buf: T) {
138        debug_assert!(buf.has_remaining());
139        self.bufs.push_back(buf);
140    }
141
142    /*
143    #[inline]
144    pub(crate) fn pop(&mut self) -> Option<T> {
145        self.bufs.pop_front()
146    }
147    */
148}
149
150impl<T: Buf> Buf for BufList<T> {
151    #[inline]
152    fn remaining(&self) -> usize {
153        self.bufs.iter().map(|buf| buf.remaining()).sum()
154    }
155
156    #[inline]
157    fn chunk(&self) -> &[u8] {
158        self.bufs.front().map(Buf::chunk).unwrap_or_default()
159    }
160
161    #[inline]
162    fn advance(&mut self, mut cnt: usize) {
163        while cnt > 0 {
164            {
165                let front = &mut self.bufs[0];
166                let rem = front.remaining();
167                if rem > cnt {
168                    front.advance(cnt);
169                    return;
170                } else {
171                    front.advance(rem);
172                    cnt -= rem;
173                }
174            }
175            self.bufs.pop_front();
176        }
177    }
178
179    #[inline]
180    fn chunks_vectored<'t>(&'t self, dst: &mut [std::io::IoSlice<'t>]) -> usize {
181        if dst.is_empty() {
182            return 0;
183        }
184        let mut vecs = 0;
185        for buf in &self.bufs {
186            vecs += buf.chunks_vectored(&mut dst[vecs..]);
187            if vecs == dst.len() {
188                break;
189            }
190        }
191        vecs
192    }
193
194    #[inline]
195    fn copy_to_bytes(&mut self, len: usize) -> Bytes {
196        use bytes::{BufMut, BytesMut};
197        // Our inner buffer may have an optimized version of copy_to_bytes, and if the whole
198        // request can be fulfilled by the front buffer, we can take advantage.
199        match self.bufs.front_mut() {
200            Some(front) if front.remaining() == len => {
201                let b = front.copy_to_bytes(len);
202                self.bufs.pop_front();
203                b
204            }
205            Some(front) if front.remaining() > len => front.copy_to_bytes(len),
206            _ => {
207                assert!(len <= self.remaining(), "`len` greater than remaining");
208                let mut bm = BytesMut::with_capacity(len);
209                bm.put(self.take(len));
210                bm.freeze()
211            }
212        }
213    }
214}
215
216impl<T> Default for BufList<T> {
217    fn default() -> Self {
218        BufList {
219            bufs: VecDeque::new(),
220        }
221    }
222}