[go: up one dir, main page]

worker/http/
body.rs

1use std::{
2    pin::Pin,
3    task::{Context, Poll},
4};
5use wasm_bindgen::JsCast;
6use wasm_streams::readable::IntoStream;
7
8use crate::Error;
9use bytes::Bytes;
10use futures_util::TryStream;
11use futures_util::TryStreamExt;
12use futures_util::{stream::FusedStream, Stream, StreamExt};
13use http_body::{Body as HttpBody, Frame};
14use js_sys::Uint8Array;
15use pin_project::pin_project;
16use wasm_bindgen::JsValue;
17
18#[derive(Debug)]
19pub struct Body(Option<IntoStream<'static>>);
20
21unsafe impl Sync for Body {}
22unsafe impl Send for Body {}
23
24impl Body {
25    pub fn new(stream: web_sys::ReadableStream) -> Self {
26        Self(Some(
27            wasm_streams::ReadableStream::from_raw(stream.unchecked_into()).into_stream(),
28        ))
29    }
30
31    pub fn into_inner(self) -> Option<web_sys::ReadableStream> {
32        self.0
33            .map(|s| wasm_streams::ReadableStream::from_stream(s).into_raw())
34    }
35
36    pub fn empty() -> Self {
37        Self(None)
38    }
39
40    /// Create a `Body` using a [`Stream`](futures_util::stream::Stream)
41    pub fn from_stream<S>(stream: S) -> Result<Self, crate::Error>
42    where
43        S: TryStream + 'static,
44        S::Ok: Into<Vec<u8>>,
45        S::Error: std::fmt::Debug,
46    {
47        let js_stream = stream
48            .map_ok(|item| -> Vec<u8> { item.into() })
49            .map_ok(|chunk| {
50                let array = Uint8Array::new_with_length(chunk.len() as _);
51                array.copy_from(&chunk);
52                array.into()
53            })
54            .map_err(|err| crate::Error::RustError(format!("{err:?}")))
55            .map_err(|e| wasm_bindgen::JsValue::from(e.to_string()));
56
57        let stream = wasm_streams::ReadableStream::from_stream(js_stream);
58        let stream: web_sys::ReadableStream = stream.into_raw().dyn_into().unwrap();
59
60        Ok(Self::new(stream))
61    }
62}
63
64impl HttpBody for Body {
65    type Data = Bytes;
66    type Error = Error;
67
68    #[inline]
69    fn poll_frame(
70        mut self: Pin<&mut Self>,
71        cx: &mut Context<'_>,
72    ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
73        if let Some(ref mut stream) = &mut self.0 {
74            stream
75                .poll_next_unpin(cx)
76                .map_ok(|buf| {
77                    let bytes = Bytes::copy_from_slice(&js_sys::Uint8Array::from(buf).to_vec());
78                    Frame::data(bytes)
79                })
80                .map_err(Error::Internal)
81        } else {
82            Poll::Ready(None)
83        }
84    }
85
86    #[inline]
87    fn size_hint(&self) -> http_body::SizeHint {
88        let mut hint = http_body::SizeHint::new();
89        if let Some(ref stream) = self.0 {
90            let (lower, upper) = stream.size_hint();
91
92            hint.set_lower(lower as u64);
93            if let Some(upper) = upper {
94                hint.set_upper(upper as u64);
95            }
96        } else {
97            hint.set_lower(0);
98            hint.set_upper(0);
99        }
100        hint
101    }
102
103    fn is_end_stream(&self) -> bool {
104        if let Some(ref stream) = self.0 {
105            stream.is_terminated()
106        } else {
107            true
108        }
109    }
110}
111
112impl Stream for Body {
113    type Item = Result<Bytes, Error>;
114
115    #[inline]
116    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
117        self.poll_frame(cx).map(|o| {
118            if let Some(r) = o {
119                match r {
120                    Ok(f) => {
121                        if f.is_data() {
122                            let b = f.into_data().unwrap();
123                            Some(Ok(b))
124                        } else {
125                            // Not sure how to handle trailers in Stream
126                            None
127                        }
128                    }
129                    Err(_) => Some(Err(Error::RustError("Error polling body".to_owned()))),
130                }
131            } else {
132                None
133            }
134        })
135    }
136}
137
138#[pin_project]
139pub(crate) struct BodyStream<B> {
140    #[pin]
141    inner: B,
142}
143
144impl<B> BodyStream<B> {
145    pub(crate) fn new(inner: B) -> Self {
146        Self { inner }
147    }
148}
149
150impl<B: http_body::Body<Data = Bytes>> Stream for BodyStream<B> {
151    type Item = std::result::Result<JsValue, JsValue>;
152
153    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
154        let this = self.project();
155        let inner: Pin<&mut B> = this.inner;
156        inner.poll_frame(cx).map(|o| {
157            if let Some(r) = o {
158                match r {
159                    Ok(f) => {
160                        if f.is_data() {
161                            // Should not be Err after checking on previous line
162                            let b = f.into_data().unwrap();
163                            let array = Uint8Array::new_with_length(b.len() as _);
164                            array.copy_from(&b);
165                            Some(Ok(array.into()))
166                        } else {
167                            None
168                        }
169                    }
170                    Err(_) => Some(Err(JsValue::from_str("Error polling body"))),
171                }
172            } else {
173                None
174            }
175        })
176    }
177
178    fn size_hint(&self) -> (usize, Option<usize>) {
179        let hint = self.inner.size_hint();
180        (hint.lower() as usize, hint.upper().map(|u| u as usize))
181    }
182}