1use std::{
2 pin::Pin,
3 task::{Context, Poll},
4};
5use wasm_bindgen::JsCast;
6use wasm_streams::readable::IntoStream;
7
8use crate::Error;
9use bytes::Bytes;
10use futures_util::TryStream;
11use futures_util::TryStreamExt;
12use futures_util::{stream::FusedStream, Stream, StreamExt};
13use http_body::{Body as HttpBody, Frame};
14use js_sys::Uint8Array;
15use pin_project::pin_project;
16use wasm_bindgen::JsValue;
17
18#[derive(Debug)]
19pub struct Body(Option<IntoStream<'static>>);
20
21unsafe impl Sync for Body {}
22unsafe impl Send for Body {}
23
24impl Body {
25 pub fn new(stream: web_sys::ReadableStream) -> Self {
26 Self(Some(
27 wasm_streams::ReadableStream::from_raw(stream.unchecked_into()).into_stream(),
28 ))
29 }
30
31 pub fn into_inner(self) -> Option<web_sys::ReadableStream> {
32 self.0
33 .map(|s| wasm_streams::ReadableStream::from_stream(s).into_raw())
34 }
35
36 pub fn empty() -> Self {
37 Self(None)
38 }
39
40 pub fn from_stream<S>(stream: S) -> Result<Self, crate::Error>
42 where
43 S: TryStream + 'static,
44 S::Ok: Into<Vec<u8>>,
45 S::Error: std::fmt::Debug,
46 {
47 let js_stream = stream
48 .map_ok(|item| -> Vec<u8> { item.into() })
49 .map_ok(|chunk| {
50 let array = Uint8Array::new_with_length(chunk.len() as _);
51 array.copy_from(&chunk);
52 array.into()
53 })
54 .map_err(|err| crate::Error::RustError(format!("{err:?}")))
55 .map_err(|e| wasm_bindgen::JsValue::from(e.to_string()));
56
57 let stream = wasm_streams::ReadableStream::from_stream(js_stream);
58 let stream: web_sys::ReadableStream = stream.into_raw().dyn_into().unwrap();
59
60 Ok(Self::new(stream))
61 }
62}
63
64impl HttpBody for Body {
65 type Data = Bytes;
66 type Error = Error;
67
68 #[inline]
69 fn poll_frame(
70 mut self: Pin<&mut Self>,
71 cx: &mut Context<'_>,
72 ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
73 if let Some(ref mut stream) = &mut self.0 {
74 stream
75 .poll_next_unpin(cx)
76 .map_ok(|buf| {
77 let bytes = Bytes::copy_from_slice(&js_sys::Uint8Array::from(buf).to_vec());
78 Frame::data(bytes)
79 })
80 .map_err(Error::Internal)
81 } else {
82 Poll::Ready(None)
83 }
84 }
85
86 #[inline]
87 fn size_hint(&self) -> http_body::SizeHint {
88 let mut hint = http_body::SizeHint::new();
89 if let Some(ref stream) = self.0 {
90 let (lower, upper) = stream.size_hint();
91
92 hint.set_lower(lower as u64);
93 if let Some(upper) = upper {
94 hint.set_upper(upper as u64);
95 }
96 } else {
97 hint.set_lower(0);
98 hint.set_upper(0);
99 }
100 hint
101 }
102
103 fn is_end_stream(&self) -> bool {
104 if let Some(ref stream) = self.0 {
105 stream.is_terminated()
106 } else {
107 true
108 }
109 }
110}
111
112impl Stream for Body {
113 type Item = Result<Bytes, Error>;
114
115 #[inline]
116 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
117 self.poll_frame(cx).map(|o| {
118 if let Some(r) = o {
119 match r {
120 Ok(f) => {
121 if f.is_data() {
122 let b = f.into_data().unwrap();
123 Some(Ok(b))
124 } else {
125 None
127 }
128 }
129 Err(_) => Some(Err(Error::RustError("Error polling body".to_owned()))),
130 }
131 } else {
132 None
133 }
134 })
135 }
136}
137
138#[pin_project]
139pub(crate) struct BodyStream<B> {
140 #[pin]
141 inner: B,
142}
143
144impl<B> BodyStream<B> {
145 pub(crate) fn new(inner: B) -> Self {
146 Self { inner }
147 }
148}
149
150impl<B: http_body::Body<Data = Bytes>> Stream for BodyStream<B> {
151 type Item = std::result::Result<JsValue, JsValue>;
152
153 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
154 let this = self.project();
155 let inner: Pin<&mut B> = this.inner;
156 inner.poll_frame(cx).map(|o| {
157 if let Some(r) = o {
158 match r {
159 Ok(f) => {
160 if f.is_data() {
161 let b = f.into_data().unwrap();
163 let array = Uint8Array::new_with_length(b.len() as _);
164 array.copy_from(&b);
165 Some(Ok(array.into()))
166 } else {
167 None
168 }
169 }
170 Err(_) => Some(Err(JsValue::from_str("Error polling body"))),
171 }
172 } else {
173 None
174 }
175 })
176 }
177
178 fn size_hint(&self) -> (usize, Option<usize>) {
179 let hint = self.inner.size_hint();
180 (hint.lower() as usize, hint.upper().map(|u| u as usize))
181 }
182}